This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ee7764b9749d929917a4e9641f4a32f269674100 Author: Alexey Kudinkin <[email protected]> AuthorDate: Fri Jan 27 18:56:32 2023 -0800 [HUDI-5023] Switching default Write Executor type to `SIMPLE` (#7476) This change switches default Write Executor to be SIMPLE ie one bypassing reliance on any kind of Queue (either BoundedInMemory or Disruptor's one). This should considerably trim down on Runtime (compared to BIMQ) Compute wasted (compared to BIMQ, Disruptor) Since it eliminates unnecessary intermediary "staging" of the records in the queue (for ex, in Spark such in-memory enqueueing occurs at the ingress points, ie shuffling), and allows to handle records writing in one pass (even avoiding making copies of the records in the future) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 48 +++++++++++--------- .../hudi/execution/HoodieLazyInsertIterable.java | 30 +++++++------ .../java/org/apache/hudi/util/ExecutorFactory.java | 24 +++++----- .../hudi/execution/FlinkLazyInsertIterable.java | 2 +- .../hudi/execution/JavaLazyInsertIterable.java | 2 +- .../hudi/execution/SparkLazyInsertIterable.java | 2 +- .../TestBoundedInMemoryExecutorInSpark.java | 28 ++++++------ .../hudi/execution/TestBoundedInMemoryQueue.java | 23 ++++++---- .../execution/TestDisruptorExecutionInSpark.java | 48 ++++++++++---------- .../hudi/execution/TestDisruptorMessageQueue.java | 30 +++++++------ .../hudi/execution/TestSimpleExecutionInSpark.java | 51 +++++++++------------- .../common/util/queue/BoundedInMemoryExecutor.java | 2 +- .../hudi/common/util/queue/DisruptorExecutor.java | 19 +++++--- .../common/util/queue/DisruptorMessageQueue.java | 18 +++++--- .../hudi/common/util/queue/ExecutorType.java | 15 +------ ...mpleHoodieExecutor.java => SimpleExecutor.java} | 51 ++++++++++------------ .../common/util/queue/WaitStrategyFactory.java | 8 ++-- 17 files changed, 200 insertions(+), 201 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2f36aa725e3..b9d7c800250 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -54,6 +54,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.queue.DisruptorWaitStrategyType; import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; @@ -97,8 +99,7 @@ import java.util.Properties; import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY; -import static org.apache.hudi.common.util.queue.ExecutorType.DISRUPTOR; +import static org.apache.hudi.common.util.queue.ExecutorType.SIMPLE; import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY; import static org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy; @@ -158,10 +159,10 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` " + "extract a key out of incoming records."); - public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty + public static final ConfigProperty<String> WRITE_EXECUTOR_TYPE = ConfigProperty .key("hoodie.write.executor.type") - .defaultValue(BOUNDED_IN_MEMORY.name()) - .withValidValues(BOUNDED_IN_MEMORY.name(), DISRUPTOR.name()) + .defaultValue(SIMPLE.name()) + .withValidValues(Arrays.stream(ExecutorType.values()).map(Enum::name).toArray(String[]::new)) .sinceVersion("0.13.0") .withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue." + "BOUNDED_IN_MEMORY(default): Use LinkedBlockingQueue as a bounded in-memory queue, this queue will use extra lock to balance producers and consumer" @@ -271,15 +272,15 @@ public class HoodieWriteConfig extends HoodieConfig { .defaultValue(String.valueOf(4 * 1024 * 1024)) .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes."); - public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE = ConfigProperty + public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty .key("hoodie.write.executor.disruptor.buffer.size") .defaultValue(String.valueOf(1024)) .sinceVersion("0.13.0") .withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2"); - public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = ConfigProperty + public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY = ConfigProperty .key("hoodie.write.executor.disruptor.wait.strategy") - .defaultValue("BLOCKING_WAIT") + .defaultValue(DisruptorWaitStrategyType.BLOCKING_WAIT.name()) .sinceVersion("0.13.0") .withDocumentation("Strategy employed for making Disruptor Executor wait on a cursor. Other options are " + "SLEEPING_WAIT, it attempts to be conservative with CPU usage by using a simple busy wait loop" @@ -1107,7 +1108,7 @@ public class HoodieWriteConfig extends HoodieConfig { } public ExecutorType getExecutorType() { - return ExecutorType.valueOf(getStringOrDefault(EXECUTOR_TYPE).toUpperCase(Locale.ROOT)); + return ExecutorType.valueOf(getStringOrDefault(WRITE_EXECUTOR_TYPE).toUpperCase(Locale.ROOT)); } public boolean isCDCEnabled() { @@ -1175,12 +1176,12 @@ public class HoodieWriteConfig extends HoodieConfig { return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE)); } - public Option<String> getWriteExecutorWaitStrategy() { - return Option.of(getString(WRITE_WAIT_STRATEGY)); + public String getWriteExecutorDisruptorWaitStrategy() { + return getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY); } - public Option<Integer> getDisruptorWriteBufferSize() { - return Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE))); + public Integer getWriteExecutorDisruptorWriteBufferSize() { + return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE)); } public boolean shouldCombineBeforeInsert() { @@ -1987,7 +1988,7 @@ public class HoodieWriteConfig extends HoodieConfig { public String getDatadogApiKey() { if (props.containsKey(HoodieMetricsDatadogConfig.API_KEY.key())) { return getString(HoodieMetricsDatadogConfig.API_KEY); - + } else { Supplier<String> apiKeySupplier = ReflectionUtils.loadClass( getString(HoodieMetricsDatadogConfig.API_KEY_SUPPLIER)); @@ -2481,7 +2482,7 @@ public class HoodieWriteConfig extends HoodieConfig { } public Builder withExecutorType(String executorClass) { - writeConfig.setValue(EXECUTOR_TYPE, executorClass); + writeConfig.setValue(WRITE_EXECUTOR_TYPE, executorClass); return this; } @@ -2536,13 +2537,13 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } - public Builder withWriteWaitStrategy(String waitStrategy) { - writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy)); + public Builder withWriteExecutorDisruptorWaitStrategy(String waitStrategy) { + writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY, String.valueOf(waitStrategy)); return this; } - public Builder withWriteBufferSize(int size) { - writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size)); + public Builder withWriteExecutorDisruptorWriteBufferSize(long size) { + writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, String.valueOf(size)); return this; } @@ -2970,8 +2971,15 @@ public class HoodieWriteConfig extends HoodieConfig { } public HoodieWriteConfig build() { + return build(true); + } + + @VisibleForTesting + public HoodieWriteConfig build(boolean shouldValidate) { setDefaults(); - validate(); + if (shouldValidate) { + validate(); + } // Build WriteConfig at the end return new HoodieWriteConfig(engineType, writeConfig.getProps()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java index 20f75f63c52..991e52982cf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java @@ -21,9 +21,9 @@ package org.apache.hudi.execution; import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.LazyIterableIterator; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.WriteHandleFactory; @@ -90,23 +90,25 @@ public abstract class HoodieLazyInsertIterable<T> } } - static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema, - HoodieWriteConfig config) { - return getCloningTransformerInternal(schema, config.getProps()); + static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema, + HoodieWriteConfig writeConfig) { + return getTransformerInternal(schema, writeConfig); } - static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformer(Schema schema) { - return getCloningTransformerInternal(schema, new TypedProperties()); - } + private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema, + HoodieWriteConfig writeConfig) { + // NOTE: Whether record have to be cloned here is determined based on the executor type used + // for writing: executors relying on an inner queue, will be keeping references to the records + // and therefore in the environments where underlying buffer holding the record could be + // reused (for ex, Spark) we need to make sure that we get a clean copy of + // it since these records will be subsequently buffered (w/in the in-memory queue); + // Only case when we don't need to make a copy is when using [[SimpleExecutor]] which + // is guaranteed to not hold on to references to any records + boolean shouldClone = writeConfig.getExecutorType() != ExecutorType.SIMPLE; - private static <T> Function<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>> getCloningTransformerInternal(Schema schema, - TypedProperties props) { return record -> { - // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific - // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of - // it since these records will be subsequently buffered (w/in the in-memory queue) - HoodieRecord<T> clonedRecord = record.copy(); - return new HoodieInsertValueGenResult(clonedRecord, schema, props); + HoodieRecord<T> clonedRecord = shouldClone ? record.copy() : record; + return new HoodieInsertValueGenResult(clonedRecord, schema, writeConfig.getProps()); }; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java index b9e7f06f848..bd192f649db 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java @@ -20,11 +20,11 @@ package org.apache.hudi.util; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.SimpleHoodieExecutor; import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.ExecutorType; -import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.common.util.queue.HoodieConsumer; +import org.apache.hudi.common.util.queue.HoodieExecutor; +import org.apache.hudi.common.util.queue.SimpleExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -34,17 +34,17 @@ import java.util.function.Function; public class ExecutorFactory { public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig, - Iterator<I> inputItr, - HoodieConsumer<O, E> consumer, - Function<I, O> transformFunction) { + Iterator<I> inputItr, + HoodieConsumer<O, E> consumer, + Function<I, O> transformFunction) { return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop()); } public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig, - Iterator<I> inputItr, - HoodieConsumer<O, E> consumer, - Function<I, O> transformFunction, - Runnable preExecuteRunnable) { + Iterator<I> inputItr, + HoodieConsumer<O, E> consumer, + Function<I, O> transformFunction, + Runnable preExecuteRunnable) { ExecutorType executorType = hoodieConfig.getExecutorType(); switch (executorType) { @@ -52,10 +52,10 @@ public class ExecutorFactory { return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer, transformFunction, preExecuteRunnable); case DISRUPTOR: - return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer, - transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable); + return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer, + transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable); case SIMPLE: - return new SimpleHoodieExecutor<>(inputItr, consumer, transformFunction); + return new SimpleExecutor<>(inputItr, consumer, transformFunction); default: throw new HoodieException("Unsupported Executor Type " + executorType); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index 69797166249..6e573ec9432 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -60,7 +60,7 @@ public class FlinkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> { try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), - getCloningTransformer(schema, hoodieConfig)); + getTransformer(schema, hoodieConfig)); final List<WriteStatus> result = bufferedIteratorExecutor.execute(); checkState(result != null && !result.isEmpty()); return result; diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index 5113b340680..d2e813a506b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -64,7 +64,7 @@ public class JavaLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> { try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); bufferedIteratorExecutor = - ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getCloningTransformer(schema)); + ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig)); final List<WriteStatus> result = bufferedIteratorExecutor.execute(); checkState(result != null && !result.isEmpty()); return result; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index 7cb2b27e171..147b5cf6b33 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -87,7 +87,7 @@ public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> { } bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), - getCloningTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); + getTransformer(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); final List<WriteStatus> result = bufferedIteratorExecutor.execute(); checkState(result != null && !result.isEmpty()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index 3ccdf1ec106..eb61cb43312 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -41,18 +42,21 @@ import java.util.List; import scala.Tuple2; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name()) + .withWriteBufferLimitBytes(1024) + .build(false); + @BeforeEach public void setUp() throws Exception { initTestDataGenerator(); @@ -74,8 +78,6 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness final int recordNumber = 100; final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, recordNumber); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { @@ -94,8 +96,8 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null; try { - executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + executor = new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable()); int result = executor.execute(); assertEquals(100, result); @@ -113,8 +115,6 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness public void testInterruptExecutor() { final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { @@ -136,8 +136,8 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness }; BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = - new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable()); // Interrupt the current thread (therefore triggering executor to throw as soon as it // invokes [[get]] on the [[CompletableFuture]]) @@ -154,8 +154,6 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness @Test public void testExecutorTermination() { - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); Iterator<GenericRecord> unboundedRecordIter = new Iterator<GenericRecord>() { @Override public boolean hasNext() { @@ -181,8 +179,8 @@ public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness }; BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = - new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), unboundedRecordIter, - consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), + new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), unboundedRecordIter, + consumer, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), getPreExecuteRunnable()); executor.shutdownNow(); boolean terminatedGracefully = executor.awaitTermination(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index c9be18c9da3..50ba44c5688 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -27,9 +27,11 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.common.util.queue.BoundedInMemoryQueue; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; @@ -54,7 +56,7 @@ import java.util.stream.IntStream; import scala.Tuple2; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -65,6 +67,11 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name()) + .withWriteBufferLimitBytes(1024) + .build(false); + @BeforeEach public void setUp() throws Exception { initTestDataGenerator(); @@ -85,7 +92,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { final int numRecords = 128; final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, numRecords); final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(FileIOUtils.KB, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Produce Future<Boolean> resFuture = executorService.submit(() -> { new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue); @@ -125,7 +132,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { final List<List<HoodieRecord>> recs = new ArrayList<>(); final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(FileIOUtils.KB, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Record Key to <Producer Index, Rec Index within a producer> Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>(); @@ -220,11 +227,11 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { final int recordLimit = 5; final SizeEstimator<HoodieLazyInsertIterable.HoodieInsertValueGenResult> sizeEstimator = new DefaultSizeEstimator<>(); HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0)); + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig).apply(hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(genResult); final long memoryLimitInBytes = recordLimit * objSize; final BoundedInMemoryQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue = - new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Produce executorService.submit(() -> { @@ -269,7 +276,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator = new DefaultSizeEstimator<>(); // queue memory limit HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult = - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0)); + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig).apply(hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(new Tuple2(genResult.getResult(), genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new Properties()))); final long memoryLimitInBytes = 4 * objSize; @@ -277,7 +284,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { // stops and throws // correct exception back. BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue1 = - new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Produce Future<Boolean> resFuture = executorService.submit(() -> { @@ -305,7 +312,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness { when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue2 = - new BoundedInMemoryQueue(memoryLimitInBytes, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + new BoundedInMemoryQueue(memoryLimitInBytes, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig)); // Produce Future<Boolean> res = executorService.submit(() -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java index 674aca74153..55c2325b137 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java @@ -18,13 +18,11 @@ package org.apache.hudi.execution; -import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.common.util.queue.DisruptorExecutor; +import org.apache.hudi.common.util.queue.ExecutorType; +import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.common.util.queue.WaitStrategyFactory; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -38,21 +36,23 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; -import scala.Tuple2; - -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + + private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withExecutorType(ExecutorType.DISRUPTOR.name()) + .withWriteExecutorDisruptorWriteBufferSize(8) + .build(false); + @BeforeEach public void setUp() throws Exception { initTestDataGenerator(); @@ -75,16 +75,14 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness { final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128); final List<HoodieRecord> consumedRecords = new ArrayList<>(); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8)); - HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = - new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { + HoodieConsumer<HoodieRecord, Integer> consumer = + new HoodieConsumer<HoodieRecord, Integer>() { private int count = 0; @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) { - consumedRecords.add(record.getResult()); + public void consume(HoodieRecord record) { + consumedRecords.add(record); count++; } @@ -93,11 +91,11 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness { return count; } }; - DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null; + DisruptorExecutor<HoodieRecord, HoodieRecord, Integer> exec = null; try { - exec = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); int result = exec.execute(); // It should buffer and write 100 records assertEquals(128, result); @@ -123,13 +121,11 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness { public void testInterruptExecutor() { final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(1024)); - HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = - new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { + HoodieConsumer<HoodieRecord, Integer> consumer = + new HoodieConsumer<HoodieRecord, Integer>() { @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) { + public void consume(HoodieRecord record) { try { Thread.currentThread().wait(); } catch (InterruptedException ie) { @@ -143,9 +139,9 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness { } }; - DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> - executor = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + DisruptorExecutor<HoodieRecord, HoodieRecord, Integer> + executor = new DisruptorExecutor<>(1024, hoodieRecords.iterator(), consumer, + Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); try { Thread.currentThread().interrupt(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index 2ff9d02926b..7344ccd89fd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.queue.DisruptorExecutor; import org.apache.hudi.common.util.queue.DisruptorMessageQueue; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.common.util.queue.HoodieProducer; @@ -56,17 +57,20 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.hudi.exception.ExceptionUtil.getRootCause; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformer; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestDisruptorMessageQueue extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withExecutorType(ExecutorType.DISRUPTOR.name()) + .withWriteExecutorDisruptorWriteBufferSize(16) + .build(false); + @BeforeEach public void setUp() throws Exception { initTestDataGenerator(); @@ -108,8 +112,6 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness { } }); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16)); HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { @@ -137,8 +139,8 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness { DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null; try { - exec = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, - getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); int result = exec.execute(); // It should buffer and write 100 records assertEquals(100, result); @@ -167,8 +169,8 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness { final List<List<HoodieRecord>> recs = new ArrayList<>(); final DisruptorMessageQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue = - new DisruptorMessageQueue(Option.of(1024), getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), - Option.of("BLOCKING_WAIT"), numProducers, new Runnable() { + new DisruptorMessageQueue(1024, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + "BLOCKING_WAIT", numProducers, new Runnable() { @Override public void run() { // do nothing. @@ -282,8 +284,8 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness { final int numProducers = 40; final DisruptorMessageQueue<HoodieRecord, HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue = - new DisruptorMessageQueue(Option.of(1024), getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), - Option.of("BLOCKING_WAIT"), numProducers, new Runnable() { + new DisruptorMessageQueue(1024, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + "BLOCKING_WAIT", numProducers, new Runnable() { @Override public void run() { // do nothing. @@ -324,9 +326,9 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness { } }; - DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(Option.of(1024), - producers, consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA), - Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(1024, + producers, consumer, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), + WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable()); final Throwable thrown = assertThrows(HoodieException.class, exec::execute, "exception is expected"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java index bbc85efd376..830577463b2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java @@ -18,8 +18,6 @@ package org.apache.hudi.execution; -import static org.apache.hudi.execution.HoodieLazyInsertIterable.getCloningTransformer; - import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieRecord; @@ -27,8 +25,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.HoodieConsumer; -import org.apache.hudi.common.util.queue.SimpleHoodieExecutor; -import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.common.util.queue.SimpleExecutor; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.junit.jupiter.api.AfterEach; @@ -41,16 +38,13 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; - -import scala.Tuple2; +import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { +public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -71,15 +65,13 @@ public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128); final List<HoodieRecord> consumedRecords = new ArrayList<>(); - HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); - when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8)); - HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = - new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { + HoodieConsumer<HoodieRecord, Integer> consumer = + new HoodieConsumer<HoodieRecord, Integer>() { private int count = 0; @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) throws Exception { - consumedRecords.add(record.getResult()); + public void consume(HoodieRecord record) throws Exception { + consumedRecords.add(record); count++; } @@ -88,10 +80,10 @@ public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { return count; } }; - SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null; + SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = null; try { - exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, Function.identity()); int result = exec.execute(); // It should buffer and write 128 records @@ -133,16 +125,16 @@ public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { } }); - HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = - new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { + HoodieConsumer<HoodieRecord, Integer> consumer = + new HoodieConsumer<HoodieRecord, Integer>() { private int count = 0; @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) throws Exception { + public void consume(HoodieRecord record) throws Exception { count++; - afterRecord.add((HoodieAvroRecord) record.getResult()); + afterRecord.add((HoodieAvroRecord) record); try { - IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.getResult()) + IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record) .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get(); afterIndexedRecord.add(indexedRecord); } catch (IOException e) { @@ -156,10 +148,10 @@ public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { } }; - SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null; + SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = null; try { - exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer, Function.identity()); int result = exec.execute(); assertEquals(100, result); @@ -186,14 +178,13 @@ public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime, numRecords); InnerIterator iterator = new InnerIterator(pRecs.iterator(), errorMessage, numRecords / 10); - HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = - new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { + HoodieConsumer<HoodieRecord, Integer> consumer = + new HoodieConsumer<HoodieRecord, Integer>() { int count = 0; @Override - public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> payload) throws Exception { + public void consume(HoodieRecord payload) throws Exception { // Read recs and ensure we have covered all producer recs. - final HoodieRecord rec = payload.getResult(); count++; } @@ -203,8 +194,8 @@ public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { } }; - SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = - new SimpleHoodieExecutor(iterator, consumer, getCloningTransformer(HoodieTestDataGenerator.AVRO_SCHEMA)); + SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = + new SimpleExecutor<>(iterator, consumer, Function.identity()); final Throwable thrown = assertThrows(HoodieException.class, exec::execute, "exception is expected"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index c379a7abcc0..20e5af1a91c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -61,7 +61,7 @@ public class BoundedInMemoryExecutor<I, O, E> extends BaseHoodieQueueBasedExecut } LOG.info("All records from the queue have been consumed"); } catch (Exception e) { - LOG.error("Error consuming records", e); + LOG.error("Failed consuming records", e); queue.markAsFailed(e); throw new HoodieException(e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java index 4ebbd6e528b..056c45a0bf4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java @@ -32,16 +32,23 @@ import java.util.function.Function; */ public class DisruptorExecutor<I, O, E> extends BaseHoodieQueueBasedExecutor<I, O, E> { - public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I> inputItr, - HoodieConsumer<O, E> consumer, Function<I, O> transformFunction, Option<String> waitStrategy, Runnable preExecuteRunnable) { + public DisruptorExecutor(Integer bufferSize, + Iterator<I> inputItr, + HoodieConsumer<O, E> consumer, + Function<I, O> transformFunction, + String waitStrategy, + Runnable preExecuteRunnable) { this(bufferSize, Collections.singletonList(new IteratorBasedQueueProducer<>(inputItr)), consumer, transformFunction, waitStrategy, preExecuteRunnable); } - public DisruptorExecutor(final Option<Integer> bufferSize, List<HoodieProducer<I>> producers, - HoodieConsumer<O, E> consumer, final Function<I, O> transformFunction, - final Option<String> waitStrategy, Runnable preExecuteRunnable) { - super(producers, Option.of(consumer), new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable), preExecuteRunnable); + public DisruptorExecutor(int bufferSize, + List<HoodieProducer<I>> producers, + HoodieConsumer<O, E> consumer, + Function<I, O> transformFunction, + String waitStrategyId, + Runnable preExecuteRunnable) { + super(producers, Option.of(consumer), new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategyId, producers.size(), preExecuteRunnable), preExecuteRunnable); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java index 19b0a04b5e8..de4ecff9ae4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java @@ -27,6 +27,8 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.function.Function; @@ -38,6 +40,8 @@ import java.util.function.Function; */ public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> { + private static final Logger LOG = LogManager.getLogger(DisruptorMessageQueue.class); + private final Disruptor<HoodieDisruptorEvent> queue; private final Function<I, O> transformFunction; private final RingBuffer<HoodieDisruptorEvent> ringBuffer; @@ -45,11 +49,11 @@ public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> { private boolean isShutdown = false; private boolean isStarted = false; - public DisruptorMessageQueue(Option<Integer> bufferSize, Function<I, O> transformFunction, Option<String> waitStrategyName, int totalProducers, Runnable preExecuteRunnable) { - WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName); + public DisruptorMessageQueue(int bufferSize, Function<I, O> transformFunction, String waitStrategyId, int totalProducers, Runnable preExecuteRunnable) { + WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyId); CustomizedThreadFactory threadFactory = new CustomizedThreadFactory("disruptor", true, preExecuteRunnable); - this.queue = new Disruptor<>(HoodieDisruptorEvent::new, bufferSize.get(), threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy); + this.queue = new Disruptor<>(HoodieDisruptorEvent::new, bufferSize, threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy); this.ringBuffer = queue.getRingBuffer(); this.transformFunction = transformFunction; } @@ -103,9 +107,13 @@ public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> { } } - protected void setHandlers(HoodieConsumer consumer) { + protected void setHandlers(HoodieConsumer<O, ?> consumer) { queue.handleEventsWith((event, sequence, endOfBatch) -> { - consumer.consume(event.get()); + try { + consumer.consume(event.get()); + } catch (Exception e) { + LOG.error("Failed consuming records", e); + } }); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java index 43b2f061030..77004173423 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java @@ -18,12 +18,6 @@ package org.apache.hudi.common.util.queue; -import org.apache.hudi.keygen.constant.KeyGeneratorType; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - /** * Types of {@link org.apache.hudi.common.util.queue.HoodieExecutor}. */ @@ -46,12 +40,5 @@ public enum ExecutorType { * The disadvantage is that the executor is a single-write-single-read model, cannot support functions such as speed limit * and can not de-coupe the network read (shuffle read) and network write (writing objects/files to storage) anymore. */ - SIMPLE; - - public static List<String> getNames() { - List<String> names = new ArrayList<>(ExecutorType.values().length); - Arrays.stream(KeyGeneratorType.values()) - .forEach(x -> names.add(x.name())); - return names; - } + SIMPLE } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java similarity index 53% rename from hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java index 8d9fcc892be..e175c22d930 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleExecutor.java @@ -18,9 +18,6 @@ package org.apache.hudi.common.util.queue; -import static org.apache.hudi.common.util.ValidationUtils.checkState; - -import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -29,31 +26,31 @@ import java.util.Iterator; import java.util.function.Function; /** - * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock. - * Consuming and writing records from iterator directly. + * Simple implementation of the {@link HoodieExecutor} interface assuming single-writer/single-reader + * mode allowing it to consume from the input {@link Iterator} directly avoiding the need for + * any internal materialization (ie queueing). * - * Compared with queue based Executor - * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. - * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. + * <p> + * Such executor is aimed primarily at allowing + * the production-consumption chain to run w/ as little overhead as possible, at the expense of + * limited parallelism and therefore throughput, which is not an issue for execution environments + * such as Spark, where it's used primarily in a parallelism constraint environment (on executors) */ -public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> { +public class SimpleExecutor<I, O, E> implements HoodieExecutor<E> { - private static final Logger LOG = LogManager.getLogger(SimpleHoodieExecutor.class); + private static final Logger LOG = LogManager.getLogger(SimpleExecutor.class); + // Record iterator (producer) + private final Iterator<I> itr; // Consumer - protected final Option<HoodieConsumer<O, E>> consumer; - // records iterator - protected final Iterator<I> it; - private final Function<I, O> transformFunction; + private final HoodieConsumer<O, E> consumer; - public SimpleHoodieExecutor(final Iterator<I> inputItr, HoodieConsumer<O, E> consumer, - Function<I, O> transformFunction) { - this(inputItr, Option.of(consumer), transformFunction); - } + private final Function<I, O> transformFunction; - public SimpleHoodieExecutor(final Iterator<I> inputItr, Option<HoodieConsumer<O, E>> consumer, - Function<I, O> transformFunction) { - this.it = inputItr; + public SimpleExecutor(Iterator<I> inputItr, + HoodieConsumer<O, E> consumer, + Function<I, O> transformFunction) { + this.itr = inputItr; this.consumer = consumer; this.transformFunction = transformFunction; } @@ -63,18 +60,16 @@ public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> { */ @Override public E execute() { - checkState(this.consumer.isPresent()); - try { LOG.info("Starting consumer, consuming records from the records iterator directly"); - while (it.hasNext()) { - O payload = transformFunction.apply(it.next()); - consumer.get().consume(payload); + while (itr.hasNext()) { + O payload = transformFunction.apply(itr.next()); + consumer.consume(payload); } - return consumer.get().finish(); + return consumer.finish(); } catch (Exception e) { - LOG.error("Error consuming records in SimpleHoodieExecutor", e); + LOG.error("Failed consuming records", e); throw new HoodieException(e); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java index dc3c4474810..5c32e7f835e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java @@ -18,14 +18,12 @@ package org.apache.hudi.common.util.queue; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieException; - import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.YieldingWaitStrategy; +import org.apache.hudi.exception.HoodieException; import static org.apache.hudi.common.util.queue.DisruptorWaitStrategyType.BLOCKING_WAIT; @@ -39,8 +37,8 @@ public class WaitStrategyFactory { /** * Build WaitStrategy for disruptor */ - public static WaitStrategy build(Option<String> name) { - DisruptorWaitStrategyType strategyType = name.isPresent() ? DisruptorWaitStrategyType.valueOf(name.get().toUpperCase()) : BLOCKING_WAIT; + public static WaitStrategy build(String name) { + DisruptorWaitStrategyType strategyType = DisruptorWaitStrategyType.valueOf(name); switch (strategyType) { case BLOCKING_WAIT:
