This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a5434b6b4d [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving
Hoodie Writing Efficiency (#5416)
a5434b6b4d is described below
commit a5434b6b4d9bef9eea29bf33f08e7f13753057a9
Author: YueZhang <[email protected]>
AuthorDate: Thu Nov 3 08:02:18 2022 +0800
[HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing
Efficiency (#5416)
https://issues.apache.org/jira/browse/HUDI-3963
RFC design : #5567
Add Lock-Free executor to improve hoodie writing throughput and optimize
execution efficiency.
Disruptor linked:
https://lmax-exchange.github.io/disruptor/user-guide/index.html#_introduction.
Existing BoundedInMemory is the default. Users can enable on a need basis.
Co-authored-by: yuezhang <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 51 ++++
.../hudi/execution/CopyOnWriteInsertHandler.java | 4 +-
.../action/bootstrap/BootstrapRecordConsumer.java | 8 +-
.../hudi/table/action/commit/BaseMergeHelper.java | 8 +-
.../hudi/execution/ExplicitWriteHandler.java | 4 +-
.../hudi/execution/SparkLazyInsertIterable.java | 13 +-
.../hudi/util/QueueBasedExecutorFactory.java | 52 ++++
.../TestBoundedInMemoryExecutorInSpark.java | 37 +--
.../hudi/execution/TestBoundedInMemoryQueue.java | 26 +-
.../execution/TestDisruptorExecutionInSpark.java | 159 ++++++++++
.../hudi/execution/TestDisruptorMessageQueue.java | 337 +++++++++++++++++++++
hudi-common/pom.xml | 6 +
.../hudi/common/util/CustomizedThreadFactory.java | 22 +-
.../hudi/common/util/ParquetReaderIterator.java | 4 +-
.../common/util/queue/BoundedInMemoryExecutor.java | 143 ++++-----
...ueue.java => BoundedInMemoryQueueIterable.java} | 24 +-
.../hudi/common/util/queue/DisruptorExecutor.java | 129 ++++++++
.../common/util/queue/DisruptorMessageQueue.java | 136 +++++++++
.../util/queue/DisruptorWaitStrategyType.java | 64 ++++
...nMemoryQueueConsumer.java => ExecutorType.java} | 48 ++-
.../util/queue/FunctionBasedQueueProducer.java | 8 +-
...emoryQueueProducer.java => HoodieConsumer.java} | 12 +-
...emoryQueueProducer.java => HoodieExecutor.java} | 26 +-
.../hudi/common/util/queue/HoodieExecutorBase.java | 143 +++++++++
...oducer.java => HoodieIterableMessageQueue.java} | 15 +-
...yQueueProducer.java => HoodieMessageQueue.java} | 35 ++-
...emoryQueueProducer.java => HoodieProducer.java} | 8 +-
...nsumer.java => IteratorBasedQueueConsumer.java} | 12 +-
.../util/queue/IteratorBasedQueueProducer.java | 4 +-
.../common/util/queue/WaitStrategyFactory.java | 54 ++++
.../org/apache/hudi/table/format/FormatUtils.java | 6 +-
.../realtime/RealtimeUnmergedRecordReader.java | 6 +-
.../benchmark/BoundInMemoryExecutorBenchmark.scala | 135 +++++++++
pom.xml | 1 +
34 files changed, 1498 insertions(+), 242 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 0c0966bc5f..514a4e38dc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -49,6 +49,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
@@ -84,12 +85,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
/**
@@ -132,6 +135,14 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Key generator class, that implements
`org.apache.hudi.keygen.KeyGenerator` "
+ "extract a key out of incoming records.");
+ public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
+ .key("hoodie.write.executor.type")
+ .defaultValue(BOUNDED_IN_MEMORY.name())
+ .withDocumentation("Set executor which orchestrates concurrent producers
and consumers communicating through a message queue."
+ + "default value is BOUNDED_IN_MEMORY which use a bounded in-memory
queue using LinkedBlockingQueue."
+ + "Also users could use DISRUPTOR, which use disruptor as a lock
free message queue "
+ + "to gain better writing performance if lock was the bottleneck.
Although DISRUPTOR_EXECUTOR is still an experimental feature.");
+
public static final ConfigProperty<String> KEYGENERATOR_TYPE = ConfigProperty
.key("hoodie.datasource.write.keygenerator.type")
.defaultValue(KeyGeneratorType.SIMPLE.name())
@@ -233,6 +244,19 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(String.valueOf(4 * 1024 * 1024))
.withDocumentation("Size of in-memory buffer used for parallelizing
network reads and lake storage writes.");
+ public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE =
ConfigProperty
+ .key("hoodie.write.executor.disruptor.buffer.size")
+ .defaultValue(String.valueOf(1024))
+ .withDocumentation("The size of the Disruptor Executor ring buffer, must
be power of 2");
+
+ public static final ConfigProperty<String> WRITE_WAIT_STRATEGY =
ConfigProperty
+ .key("hoodie.write.executor.disruptor.wait.strategy")
+ .defaultValue("BLOCKING_WAIT")
+ .withDocumentation("Strategy employed for making Disruptor Executor wait
on a cursor. Other options are "
+ + "SLEEPING_WAIT, it attempts to be conservative with CPU usage by
using a simple busy wait loop"
+ + "YIELDING_WAIT, it is designed for cases where there is the option
to burn CPU cycles with the goal of improving latency"
+ + "BUSY_SPIN_WAIT, it can be used in low-latency systems, but puts
the highest constraints on the deployment environment");
+
public static final ConfigProperty<String> COMBINE_BEFORE_INSERT =
ConfigProperty
.key("hoodie.combine.before.insert")
.defaultValue("false")
@@ -975,6 +999,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(KEYGENERATOR_CLASS_NAME);
}
+ public ExecutorType getExecutorType() {
+ return
ExecutorType.valueOf(getString(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
+ }
+
public boolean isCDCEnabled() {
return getBooleanOrDefault(
HoodieTableConfig.CDC_ENABLED,
HoodieTableConfig.CDC_ENABLED.defaultValue());
@@ -1040,6 +1068,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return
Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
}
+ public Option<String> getWriteExecutorWaitStrategy() {
+ return Option.of(getString(WRITE_WAIT_STRATEGY));
+ }
+
+ public Option<Integer> getDisruptorWriteBufferSize() {
+ return
Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE)));
+ }
+
public boolean shouldCombineBeforeInsert() {
return getBoolean(COMBINE_BEFORE_INSERT);
}
@@ -2287,6 +2323,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withExecutorType(String executorClass) {
+ writeConfig.setValue(EXECUTOR_TYPE, executorClass);
+ return this;
+ }
+
public Builder withTimelineLayoutVersion(int version) {
writeConfig.setValue(TIMELINE_LAYOUT_VERSION_NUM,
String.valueOf(version));
return this;
@@ -2333,6 +2374,16 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withWriteWaitStrategy(String waitStrategy) {
+ writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy));
+ return this;
+ }
+
+ public Builder withWriteBufferSize(int size) {
+ writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
+ return this;
+ }
+
public Builder combineInput(boolean onInsert, boolean onUpsert) {
writeConfig.setValue(COMBINE_BEFORE_INSERT, String.valueOf(onInsert));
writeConfig.setValue(COMBINE_BEFORE_UPSERT, String.valueOf(onUpsert));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
index 5e1f832b7f..abb1122289 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
@@ -22,7 +22,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import
org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.hudi.io.HoodieWriteHandle;
@@ -38,7 +38,7 @@ import java.util.Map;
* Consumes stream of hoodie records from in-memory queue and writes to one or
more create-handles.
*/
public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
- extends
BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>,
List<WriteStatus>> {
+ extends
IteratorBasedQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>,
List<WriteStatus>> {
private HoodieWriteConfig config;
private String instantTime;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
index 8966a5d51c..76968c6108 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java
@@ -20,7 +20,7 @@ package org.apache.hudi.table.action.bootstrap;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieBootstrapHandle;
@@ -29,7 +29,7 @@ import java.io.IOException;
/**
* Consumer that dequeues records from queue and sends to Merge Handle for
writing.
*/
-public class BootstrapRecordConsumer extends
BoundedInMemoryQueueConsumer<HoodieRecord, Void> {
+public class BootstrapRecordConsumer extends
IteratorBasedQueueConsumer<HoodieRecord, Void> {
private final HoodieBootstrapHandle bootstrapHandle;
@@ -38,7 +38,7 @@ public class BootstrapRecordConsumer extends
BoundedInMemoryQueueConsumer<Hoodie
}
@Override
- protected void consumeOneRecord(HoodieRecord record) {
+ public void consumeOneRecord(HoodieRecord record) {
try {
bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
.getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
@@ -48,7 +48,7 @@ public class BootstrapRecordConsumer extends
BoundedInMemoryQueueConsumer<Hoodie
}
@Override
- protected void finish() {}
+ public void finish() {}
@Override
protected Void getResult() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 5ead348140..bd1c01958b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -22,7 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
@@ -109,7 +109,7 @@ public abstract class BaseMergeHelper<T extends
HoodieRecordPayload, I, K, O> {
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
- protected static class UpdateHandler extends
BoundedInMemoryQueueConsumer<GenericRecord, Void> {
+ protected static class UpdateHandler extends
IteratorBasedQueueConsumer<GenericRecord, Void> {
private final HoodieMergeHandle upsertHandle;
@@ -118,12 +118,12 @@ public abstract class BaseMergeHelper<T extends
HoodieRecordPayload, I, K, O> {
}
@Override
- protected void consumeOneRecord(GenericRecord record) {
+ public void consumeOneRecord(GenericRecord record) {
upsertHandle.write(record);
}
@Override
- protected void finish() {}
+ public void finish() {}
@Override
protected Void getResult() {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
index 46eff58757..9dab2170e0 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
@@ -21,7 +21,7 @@ package org.apache.hudi.execution;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.io.HoodieWriteHandle;
import java.util.ArrayList;
@@ -31,7 +31,7 @@ import java.util.List;
* Consumes stream of hoodie records from in-memory queue and writes to one
explicit create handle.
*/
public class ExplicitWriteHandler<T extends HoodieRecordPayload>
- extends
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
List<WriteStatus>> {
+ extends
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
List<WriteStatus>> {
private final List<WriteStatus> statuses = new ArrayList<>();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index df5bd2d3f4..d2555f9598 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -23,13 +23,14 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
+import org.apache.hudi.util.QueueBasedExecutorFactory;
import java.util.Iterator;
import java.util.List;
@@ -77,16 +78,16 @@ public class SparkLazyInsertIterable<T extends
HoodieRecordPayload> extends Hood
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
- BoundedInMemoryExecutor<HoodieRecord<T>,
HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>>
bufferedIteratorExecutor =
- null;
+ HoodieExecutor<?, ?, List<WriteStatus>> bufferedIteratorExecutor = null;
try {
Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
if (useWriterSchema) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
- bufferedIteratorExecutor =
- new
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr,
getInsertHandler(),
- getTransformFunction(schema, hoodieConfig),
hoodieTable.getPreExecuteRunnable());
+
+ bufferedIteratorExecutor =
QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(),
+ getTransformFunction(schema, hoodieConfig),
hoodieTable.getPreExecuteRunnable());
+
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() &&
!bufferedIteratorExecutor.isRemaining();
return result;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java
new file mode 100644
index 0000000000..ba8ddbd1ec
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.ExecutorType;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Iterator;
+import java.util.function.Function;
+
+public class QueueBasedExecutorFactory {
+
+ /**
+ * Create a new hoodie executor instance on demand.
+ */
+ public static <I, O, E> HoodieExecutor create(HoodieWriteConfig
hoodieConfig, Iterator<I> inputItr, IteratorBasedQueueConsumer<O, E> consumer,
+ Function<I, O> transformFunction,
Runnable preExecuteRunnable) {
+ ExecutorType executorType = hoodieConfig.getExecutorType();
+
+ switch (executorType) {
+ case BOUNDED_IN_MEMORY:
+ return new
BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr,
consumer,
+ transformFunction, preExecuteRunnable);
+ case DISRUPTOR:
+ return new
DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr,
consumer,
+ transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(),
preExecuteRunnable);
+ default:
+ throw new HoodieException("Unsupported Executor Type " + executorType);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index a714d60d00..040634da4c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -23,7 +23,7 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;
@@ -71,24 +71,21 @@ public class TestBoundedInMemoryExecutorInSpark extends
HoodieClientTestHarness
@Test
public void testExecutor() {
- final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 100);
+ final int recordNumber = 100;
+ final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, recordNumber);
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
-
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
- new
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
+
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ new
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
private int count = 0;
@Override
- protected void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
+ public void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
count++;
}
- @Override
- protected void finish() {
- }
-
@Override
protected Integer getResult() {
return count;
@@ -100,7 +97,7 @@ public class TestBoundedInMemoryExecutorInSpark extends
HoodieClientTestHarness
executor = new
BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(),
hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
getPreExecuteRunnable());
int result = executor.execute();
- // It should buffer and write 100 records
+
assertEquals(100, result);
// There should be no remaining records in the buffer
assertFalse(executor.isRemaining());
@@ -118,11 +115,11 @@ public class TestBoundedInMemoryExecutorInSpark extends
HoodieClientTestHarness
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
-
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
- new
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
+
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ new
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
@Override
- protected void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
+ public void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
try {
while (true) {
Thread.sleep(1000);
@@ -132,10 +129,6 @@ public class TestBoundedInMemoryExecutorInSpark extends
HoodieClientTestHarness
}
}
- @Override
- protected void finish() {
- }
-
@Override
protected Integer getResult() {
return 0;
@@ -176,14 +169,10 @@ public class TestBoundedInMemoryExecutorInSpark extends
HoodieClientTestHarness
}
};
-
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
- new
BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
- @Override
- protected void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
- }
-
+
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ new
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
@Override
- protected void finish() {
+ public void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index 4707a68072..c36554bb64 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueIterable;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;
@@ -83,8 +83,8 @@ public class TestBoundedInMemoryQueue extends
HoodieClientTestHarness {
public void testRecordReading() throws Exception {
final int numRecords = 128;
final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, numRecords);
- final BoundedInMemoryQueue<HoodieRecord,
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
- new BoundedInMemoryQueue(FileIOUtils.KB,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ final BoundedInMemoryQueueIterable<HoodieRecord,
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+ new BoundedInMemoryQueueIterable(FileIOUtils.KB,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> resFuture = executorService.submit(() -> {
new
IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue);
@@ -123,8 +123,8 @@ public class TestBoundedInMemoryQueue extends
HoodieClientTestHarness {
final int numProducers = 40;
final List<List<HoodieRecord>> recs = new ArrayList<>();
- final BoundedInMemoryQueue<HoodieRecord,
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
- new BoundedInMemoryQueue(FileIOUtils.KB,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ final BoundedInMemoryQueueIterable<HoodieRecord,
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+ new BoundedInMemoryQueueIterable(FileIOUtils.KB,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Record Key to <Producer Index, Rec Index within a producer>
Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new
HashMap<>();
@@ -140,7 +140,7 @@ public class TestBoundedInMemoryQueue extends
HoodieClientTestHarness {
recs.add(pRecs);
}
- List<BoundedInMemoryQueueProducer<HoodieRecord>> producers = new
ArrayList<>();
+ List<HoodieProducer<HoodieRecord>> producers = new ArrayList<>();
for (int i = 0; i < recs.size(); i++) {
final List<HoodieRecord> r = recs.get(i);
// Alternate between pull and push based iterators
@@ -222,8 +222,8 @@ public class TestBoundedInMemoryQueue extends
HoodieClientTestHarness {
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord)
hoodieRecords.get(0));
final long objSize = sizeEstimator.sizeEstimate(payload);
final long memoryLimitInBytes = recordLimit * objSize;
- final BoundedInMemoryQueue<HoodieRecord,
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
- new BoundedInMemoryQueue(memoryLimitInBytes,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ final BoundedInMemoryQueueIterable<HoodieRecord,
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+ new BoundedInMemoryQueueIterable(memoryLimitInBytes,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
executorService.submit(() -> {
@@ -275,8 +275,8 @@ public class TestBoundedInMemoryQueue extends
HoodieClientTestHarness {
// first let us throw exception from queueIterator reader and test that
queueing thread
// stops and throws
// correct exception back.
- BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>> queue1 =
- new BoundedInMemoryQueue(memoryLimitInBytes,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ BoundedInMemoryQueueIterable<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>> queue1 =
+ new BoundedInMemoryQueueIterable(memoryLimitInBytes,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> resFuture = executorService.submit(() -> {
@@ -303,8 +303,8 @@ public class TestBoundedInMemoryQueue extends
HoodieClientTestHarness {
final Iterator<HoodieRecord> mockHoodieRecordsIterator =
mock(Iterator.class);
when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
- BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>> queue2 =
- new BoundedInMemoryQueue(memoryLimitInBytes,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
+ BoundedInMemoryQueueIterable<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>> queue2 =
+ new BoundedInMemoryQueueIterable(memoryLimitInBytes,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> res = executorService.submit(() -> {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
new file mode 100644
index 0000000000..2351f2bbed
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
+
+ private final String instantTime =
HoodieActiveTimeline.createNewInstantTime();
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initTestDataGenerator();
+ initExecutorServiceWithFixedThreadPool(2);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ private Runnable getPreExecuteRunnable() {
+ final TaskContext taskContext = TaskContext.get();
+ return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+ }
+
+ @Test
+ public void testExecutor() {
+
+ final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 128);
+ final List<HoodieRecord> consumedRecords = new ArrayList<>();
+
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8));
+
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ new
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
+
+ private int count = 0;
+
+ @Override
+ public void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
+ consumedRecords.add(record.record);
+ count++;
+ }
+
+ @Override
+ protected Integer getResult() {
+ return count;
+ }
+ };
+ DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>, Integer> exec = null;
+
+ try {
+ exec = new
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(),
hoodieRecords.iterator(), consumer,
+ getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+ int result = exec.execute();
+ // It should buffer and write 100 records
+ assertEquals(128, result);
+ // There should be no remaining records in the buffer
+ assertFalse(exec.isRemaining());
+
+ // collect all records and assert that consumed records are identical to
produced ones
+ // assert there's no tampering, and that the ordering is preserved
+ assertEquals(hoodieRecords, consumedRecords);
+ for (int i = 0; i < hoodieRecords.size(); i++) {
+ assertEquals(hoodieRecords.get(i), consumedRecords.get(i));
+ }
+
+ } finally {
+ if (exec != null) {
+ exec.shutdownNow();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(value = 60)
+ public void testInterruptExecutor() {
+ final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 100);
+
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(1024));
+
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ new
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
+
+ @Override
+ public void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
+ try {
+ Thread.currentThread().wait();
+ } catch (InterruptedException ie) {
+ // ignore here
+ }
+ }
+
+ @Override
+ protected Integer getResult() {
+ return 0;
+ }
+ };
+
+ DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>, Integer>
+ executor = new
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(),
hoodieRecords.iterator(), consumer,
+ getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+
+ try {
+ Thread.currentThread().interrupt();
+ assertThrows(HoodieException.class, executor::execute);
+ assertTrue(Thread.interrupted());
+ } catch (Exception e) {
+ // ignore here
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
new file mode 100644
index 0000000000..d296d56440
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
+import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
+import org.apache.hudi.common.util.queue.DisruptorExecutor;
+import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
+import org.apache.hudi.common.util.queue.WaitStrategyFactory;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
+
+ private final String instantTime =
HoodieActiveTimeline.createNewInstantTime();
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initTestDataGenerator();
+ initExecutorServiceWithFixedThreadPool(2);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ private Runnable getPreExecuteRunnable() {
+ final TaskContext taskContext = TaskContext.get();
+ return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+ }
+
+ // Test to ensure that we are reading all records from queue iterator in the
same order
+ // without any exceptions.
+ @SuppressWarnings("unchecked")
+ @Test
+ @Timeout(value = 60)
+ public void testRecordReading() throws Exception {
+
+ final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 100);
+ ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+ ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+ ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+ ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+ hoodieRecords.forEach(record -> {
+ final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+ beforeRecord.add(originalRecord);
+ try {
+ final Option<IndexedRecord> originalInsertValue =
+
originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+ beforeIndexedRecord.add(originalInsertValue.get());
+ } catch (IOException e) {
+ // ignore exception here.
+ }
+ });
+
+ HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16));
+
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ new
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
+
+ private int count = 0;
+
+ @Override
+ public void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
+ count++;
+ afterRecord.add((HoodieAvroRecord) record.record);
+ try {
+ IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord)
record.record)
+
.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+ afterIndexedRecord.add(indexedRecord);
+ } catch (IOException e) {
+ //ignore exception here.
+ }
+ }
+
+ @Override
+ protected Integer getResult() {
+ return count;
+ }
+ };
+
+ DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>, Integer> exec = null;
+
+ try {
+ exec = new
DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(),
hoodieRecords.iterator(), consumer,
+ getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable());
+ int result = exec.execute();
+ // It should buffer and write 100 records
+ assertEquals(100, result);
+ // There should be no remaining records in the buffer
+ assertFalse(exec.isRemaining());
+
+ assertEquals(beforeRecord, afterRecord);
+ assertEquals(beforeIndexedRecord, afterIndexedRecord);
+
+ } finally {
+ if (exec != null) {
+ exec.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Test to ensure that we are reading all records from queue iterator when
we have multiple producers.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ @Timeout(value = 60)
+ public void testCompositeProducerRecordReading() throws Exception {
+ final int numRecords = 1000;
+ final int numProducers = 40;
+ final List<List<HoodieRecord>> recs = new ArrayList<>();
+
+ final DisruptorMessageQueue<HoodieRecord,
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+ new DisruptorMessageQueue(Option.of(1024),
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+ Option.of("BLOCKING_WAIT"), numProducers, new Runnable() {
+ @Override
+ public void run() {
+ // do nothing.
+ }
+ });
+
+ // Record Key to <Producer Index, Rec Index within a producer>
+ Map<String, Pair<Integer, Integer>> keyToProducerAndIndexMap = new
HashMap<>();
+
+ for (int i = 0; i < numProducers; i++) {
+ List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime,
numRecords);
+ int j = 0;
+ for (HoodieRecord r : pRecs) {
+ assertFalse(keyToProducerAndIndexMap.containsKey(r.getRecordKey()));
+ keyToProducerAndIndexMap.put(r.getRecordKey(), Pair.of(i, j));
+ j++;
+ }
+ recs.add(pRecs);
+ }
+
+ List<HoodieProducer> producers = new ArrayList<>();
+ for (int i = 0; i < recs.size(); i++) {
+ final List<HoodieRecord> r = recs.get(i);
+ // Alternate between pull and push based iterators
+ if (i % 2 == 0) {
+ HoodieProducer producer = new
IteratorBasedQueueProducer<>(r.iterator());
+ producers.add(producer);
+ } else {
+ HoodieProducer producer = new FunctionBasedQueueProducer<>((buf) -> {
+ Iterator<HoodieRecord> itr = r.iterator();
+ while (itr.hasNext()) {
+ try {
+ buf.insertRecord(itr.next());
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+ return true;
+ });
+ producers.add(producer);
+ }
+ }
+
+ // Used to ensure that consumer sees the records generated by a single
producer in FIFO order
+ Map<Integer, Integer> lastSeenMap =
+ IntStream.range(0,
numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> -1));
+ Map<Integer, Integer> countMap =
+ IntStream.range(0,
numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> 0));
+
+ // setup consumer and start disruptor
+
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ new
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
+
+ @Override
+ public void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
payload) {
+ // Read recs and ensure we have covered all producer recs.
+ final HoodieRecord rec = payload.record;
+ Pair<Integer, Integer> producerPos =
keyToProducerAndIndexMap.get(rec.getRecordKey());
+ Integer lastSeenPos = lastSeenMap.get(producerPos.getLeft());
+ countMap.put(producerPos.getLeft(),
countMap.get(producerPos.getLeft()) + 1);
+ lastSeenMap.put(producerPos.getLeft(), lastSeenPos + 1);
+ // Ensure we are seeing the next record generated
+ assertEquals(lastSeenPos + 1, producerPos.getRight().intValue());
+ }
+
+ @Override
+ protected Integer getResult() {
+ return 0;
+ }
+ };
+
+ Method setHandlersFunc = queue.getClass().getDeclaredMethod("setHandlers",
IteratorBasedQueueConsumer.class);
+ setHandlersFunc.setAccessible(true);
+ setHandlersFunc.invoke(queue, consumer);
+
+ Method startFunc = queue.getClass().getDeclaredMethod("start");
+ startFunc.setAccessible(true);
+ startFunc.invoke(queue);
+
+ // start to produce records
+ CompletableFuture<Void> producerFuture =
CompletableFuture.allOf(producers.stream().map(producer -> {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ producer.produce(queue);
+ } catch (Throwable e) {
+ throw new HoodieException("Error producing records in disruptor
executor", e);
+ }
+ return true;
+ }, executorService);
+ }).toArray(CompletableFuture[]::new));
+
+ producerFuture.get();
+
+ // wait for all the records consumed.
+ queue.close();
+
+ for (int i = 0; i < numProducers; i++) {
+ // Ensure we have seen all the records for each producers
+ assertEquals(Integer.valueOf(numRecords), countMap.get(i));
+ }
+ }
+
+ /**
+ * Test to ensure that one of the producers exception will stop current
ingestion.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ @Timeout(value = 60)
+ public void testException() throws Exception {
+ final int numRecords = 1000;
+ final int numProducers = 40;
+
+ final DisruptorMessageQueue<HoodieRecord,
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
+ new DisruptorMessageQueue(Option.of(1024),
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+ Option.of("BLOCKING_WAIT"), numProducers, new Runnable() {
+ @Override
+ public void run() {
+ // do nothing.
+ }
+ });
+
+ List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime,
numRecords);
+
+ // create 2 producers
+ // producer1 : common producer
+ // producer2 : exception producer
+ List<HoodieProducer> producers = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ if (i % 2 == 0) {
+ producers.add(new IteratorBasedQueueProducer<>(pRecs.iterator()));
+ } else {
+ producers.add(new FunctionBasedQueueProducer<>((buf) -> {
+ throw new HoodieException("Exception when produce records!!!");
+ }));
+ }
+ }
+
+
+
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ new
IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
+
+ int count = 0;
+ @Override
+ public void
consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
payload) {
+ // Read recs and ensure we have covered all producer recs.
+ final HoodieRecord rec = payload.record;
+ count++;
+ }
+
+ @Override
+ protected Integer getResult() {
+ return count;
+ }
+ };
+
+ DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(Option.of(1024),
+ producers, Option.of(consumer),
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA),
+ Option.of(WaitStrategyFactory.DEFAULT_STRATEGY),
getPreExecuteRunnable());
+
+ final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
+ "exception is expected");
+ assertTrue(thrown.getMessage().contains("Error producing records in
disruptor executor"));
+ }
+}
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 87b8e5e0be..15becc3c16 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -299,5 +299,11 @@
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java
index 738be514b2..a13f3a804f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java
@@ -33,6 +33,8 @@ public class CustomizedThreadFactory implements ThreadFactory
{
private final String threadName;
private final boolean daemon;
+ private Runnable preExecuteRunnable;
+
public CustomizedThreadFactory() {
this("pool-" + POOL_NUM.getAndIncrement(), false);
}
@@ -41,6 +43,16 @@ public class CustomizedThreadFactory implements
ThreadFactory {
this(threadNamePrefix, false);
}
+ public CustomizedThreadFactory(String threadNamePrefix, Runnable
preExecuteRunnable) {
+ this(threadNamePrefix, false, preExecuteRunnable);
+ }
+
+ public CustomizedThreadFactory(String threadNamePrefix, boolean daemon,
Runnable preExecuteRunnable) {
+ this.threadName = threadNamePrefix + "-thread-";
+ this.daemon = daemon;
+ this.preExecuteRunnable = preExecuteRunnable;
+ }
+
public CustomizedThreadFactory(String threadNamePrefix, boolean daemon) {
this.threadName = threadNamePrefix + "-thread-";
this.daemon = daemon;
@@ -48,7 +60,15 @@ public class CustomizedThreadFactory implements
ThreadFactory {
@Override
public Thread newThread(@NotNull Runnable r) {
- Thread runThread = new Thread(r);
+ Thread runThread = preExecuteRunnable == null ? new Thread(r) : new
Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ preExecuteRunnable.run();
+ r.run();
+ }
+ });
+
runThread.setDaemon(daemon);
runThread.setName(threadName + threadNum.getAndIncrement());
return runThread;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index 03bd471b60..347bcdf77a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -18,7 +18,7 @@
package org.apache.hudi.common.util;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueIterable;
import org.apache.hudi.exception.HoodieException;
import org.apache.parquet.hadoop.ParquetReader;
@@ -27,7 +27,7 @@ import java.io.IOException;
/**
* This class wraps a parquet reader and provides an iterator based api to
read from a parquet file. This is used in
- * {@link BoundedInMemoryQueue}
+ * {@link BoundedInMemoryQueueIterable}
*/
public class ParquetReaderIterator<T> implements ClosableIterator<T> {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index 46ef5dc40c..ce5898c7c3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -18,119 +18,98 @@
package org.apache.hudi.common.util.queue;
-import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.stream.Collectors;
/**
- * Executor which orchestrates concurrent producers and consumers
communicating through a bounded in-memory queue. This
+ * Executor which orchestrates concurrent producers and consumers
communicating through 'BoundedInMemoryQueue'. This
* class takes as input the size limit, queue producer(s), consumer and
transformer and exposes API to orchestrate
* concurrent execution of these actors communicating through a central
bounded queue
*/
-public class BoundedInMemoryExecutor<I, O, E> {
+public class BoundedInMemoryExecutor<I, O, E> extends HoodieExecutorBase<I, O,
E> {
private static final Logger LOG =
LogManager.getLogger(BoundedInMemoryExecutor.class);
- private static final long TERMINATE_WAITING_TIME_SECS = 60L;
- // Executor service used for launching write thread.
- private final ExecutorService producerExecutorService;
- // Executor service used for launching read thread.
- private final ExecutorService consumerExecutorService;
- // Used for buffering records which is controlled by
HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
- private final BoundedInMemoryQueue<I, O> queue;
- // Producers
- private final List<BoundedInMemoryQueueProducer<I>> producers;
- // Consumer
- private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
- // pre-execute function to implement environment specific behavior before
executors (producers/consumer) run
- private final Runnable preExecuteRunnable;
+ private final HoodieMessageQueue<I, O> queue;
public BoundedInMemoryExecutor(final long bufferLimitInBytes, final
Iterator<I> inputItr,
- BoundedInMemoryQueueConsumer<O, E> consumer,
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+ IteratorBasedQueueConsumer<O, E> consumer,
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr),
Option.of(consumer), transformFunction, preExecuteRunnable);
}
- public BoundedInMemoryExecutor(final long bufferLimitInBytes,
BoundedInMemoryQueueProducer<I> producer,
- Option<BoundedInMemoryQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction) {
+ public BoundedInMemoryExecutor(final long bufferLimitInBytes,
HoodieProducer<I> producer,
+ Option<IteratorBasedQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction) {
this(bufferLimitInBytes, producer, consumer, transformFunction,
Functions.noop());
}
- public BoundedInMemoryExecutor(final long bufferLimitInBytes,
BoundedInMemoryQueueProducer<I> producer,
- Option<BoundedInMemoryQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
+ public BoundedInMemoryExecutor(final long bufferLimitInBytes,
HoodieProducer<I> producer,
+ Option<IteratorBasedQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
this(bufferLimitInBytes, Collections.singletonList(producer), consumer,
transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable);
}
- public BoundedInMemoryExecutor(final long bufferLimitInBytes,
List<BoundedInMemoryQueueProducer<I>> producers,
- Option<BoundedInMemoryQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction,
+ public BoundedInMemoryExecutor(final long bufferLimitInBytes,
List<HoodieProducer<I>> producers,
+ Option<IteratorBasedQueueConsumer<O, E>>
consumer, final Function<I, O> transformFunction,
final SizeEstimator<O> sizeEstimator,
Runnable preExecuteRunnable) {
- this.producers = producers;
- this.consumer = consumer;
- this.preExecuteRunnable = preExecuteRunnable;
- // Ensure fixed thread for each producer thread
- this.producerExecutorService =
Executors.newFixedThreadPool(producers.size(), new
CustomizedThreadFactory("producer"));
- // Ensure single thread for consumer
- this.consumerExecutorService = Executors.newSingleThreadExecutor(new
CustomizedThreadFactory("consumer"));
- this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes,
transformFunction, sizeEstimator);
+ super(producers, consumer, preExecuteRunnable);
+ this.queue = new BoundedInMemoryQueueIterable<>(bufferLimitInBytes,
transformFunction, sizeEstimator);
}
/**
- * Start all Producers.
+ * Start all producers at once.
*/
- public ExecutorCompletionService<Boolean> startProducers() {
+ @Override
+ public CompletableFuture<Void> startProducers() {
// Latch to control when and which producer thread will close the queue
final CountDownLatch latch = new CountDownLatch(producers.size());
- final ExecutorCompletionService<Boolean> completionService =
- new ExecutorCompletionService<Boolean>(producerExecutorService);
- producers.stream().map(producer -> {
- return completionService.submit(() -> {
+
+ return CompletableFuture.allOf(producers.stream().map(producer -> {
+ return CompletableFuture.supplyAsync(() -> {
try {
- preExecuteRunnable.run();
producer.produce(queue);
} catch (Throwable e) {
LOG.error("error producing records", e);
queue.markAsFailed(e);
- throw e;
+ throw new HoodieException("Error producing records in bounded in
memory executor", e);
} finally {
synchronized (latch) {
latch.countDown();
if (latch.getCount() == 0) {
// Mark production as done so that consumer will be able to exit
- queue.close();
+ try {
+ queue.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Catch Exception when closing
BoundedInMemoryQueue.", e);
+ }
}
}
}
return true;
- });
- }).collect(Collectors.toList());
- return completionService;
+ }, producerExecutorService);
+ }).toArray(CompletableFuture[]::new));
}
/**
* Start only consumer.
*/
- private Future<E> startConsumer() {
+ @Override
+ protected CompletableFuture<E> startConsumer() {
return consumer.map(consumer -> {
- return consumerExecutorService.submit(() -> {
+ return CompletableFuture.supplyAsync(() -> {
LOG.info("starting consumer thread");
- preExecuteRunnable.run();
try {
E result = consumer.consume(queue);
LOG.info("Queue Consumption is done; notifying producer threads");
@@ -138,61 +117,41 @@ public class BoundedInMemoryExecutor<I, O, E> {
} catch (Exception e) {
LOG.error("error consuming records", e);
queue.markAsFailed(e);
- throw e;
+ throw new HoodieException(e);
}
- });
+ }, consumerExecutorService);
}).orElse(CompletableFuture.completedFuture(null));
}
- /**
- * Main API to run both production and consumption.
- */
- public E execute() {
- try {
- startProducers();
- Future<E> future = startConsumer();
- // Wait for consumer to be done
- return future.get();
- } catch (InterruptedException ie) {
- shutdownNow();
- Thread.currentThread().interrupt();
- throw new HoodieException(ie);
- } catch (Exception e) {
- throw new HoodieException(e);
- }
+ @Override
+ public boolean isRemaining() {
+ return getQueue().iterator().hasNext();
}
- public boolean isRemaining() {
- return queue.iterator().hasNext();
+ @Override
+ protected void postAction() {
+ super.close();
}
+ @Override
public void shutdownNow() {
producerExecutorService.shutdownNow();
consumerExecutorService.shutdownNow();
// close queue to force producer stop
- queue.close();
- }
-
- public boolean awaitTermination() {
- // if current thread has been interrupted before awaitTermination was
called, we still give
- // executor a chance to proceeding. So clear the interrupt flag and reset
it if needed before return.
- boolean interruptedBefore = Thread.interrupted();
- boolean producerTerminated = false;
- boolean consumerTerminated = false;
try {
- producerTerminated =
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS,
TimeUnit.SECONDS);
- consumerTerminated =
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS,
TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- // fail silently for any other interruption
+ queue.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("catch IOException while closing
HoodieMessageQueue", e);
}
- // reset interrupt flag if needed
- if (interruptedBefore) {
- Thread.currentThread().interrupt();
- }
- return producerTerminated && consumerTerminated;
}
- public BoundedInMemoryQueue<I, O> getQueue() {
- return queue;
+ @Override
+ public BoundedInMemoryQueueIterable<I, O> getQueue() {
+ return (BoundedInMemoryQueueIterable<I, O>)queue;
+ }
+
+ @Override
+ protected void setup() {
+ // do nothing.
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java
similarity index 94%
rename from
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java
index dfe33b49ec..47b8c81fc4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java
@@ -49,7 +49,7 @@ import java.util.function.Function;
* @param <I> input payload data type
* @param <O> output payload data type
*/
-public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
+public class BoundedInMemoryQueueIterable<I, O> extends
HoodieIterableMessageQueue<I, O> {
/** Interval used for polling records in the queue. **/
public static final int RECORD_POLL_INTERVAL_SEC = 1;
@@ -60,7 +60,7 @@ public class BoundedInMemoryQueue<I, O> implements
Iterable<O> {
/** Maximum records that will be cached. **/
private static final int RECORD_CACHING_LIMIT = 128 * 1024;
- private static final Logger LOG =
LogManager.getLogger(BoundedInMemoryQueue.class);
+ private static final Logger LOG =
LogManager.getLogger(BoundedInMemoryQueueIterable.class);
/**
* It indicates number of records to cache. We will be using sampled
record's average size to
@@ -116,7 +116,7 @@ public class BoundedInMemoryQueue<I, O> implements
Iterable<O> {
* @param memoryLimit MemoryLimit in bytes
* @param transformFunction Transformer Function to convert input payload
type to stored payload type
*/
- public BoundedInMemoryQueue(final long memoryLimit, final Function<I, O>
transformFunction) {
+ public BoundedInMemoryQueueIterable(final long memoryLimit, final
Function<I, O> transformFunction) {
this(memoryLimit, transformFunction, new DefaultSizeEstimator() {});
}
@@ -127,15 +127,16 @@ public class BoundedInMemoryQueue<I, O> implements
Iterable<O> {
* @param transformFunction Transformer Function to convert input payload
type to stored payload type
* @param payloadSizeEstimator Payload Size Estimator
*/
- public BoundedInMemoryQueue(final long memoryLimit, final Function<I, O>
transformFunction,
- final SizeEstimator<O> payloadSizeEstimator) {
+ public BoundedInMemoryQueueIterable(final long memoryLimit, final
Function<I, O> transformFunction,
+ final SizeEstimator<O>
payloadSizeEstimator) {
this.memoryLimit = memoryLimit;
this.transformFunction = transformFunction;
this.payloadSizeEstimator = payloadSizeEstimator;
this.iterator = new QueueIterator();
}
- public int size() {
+ @Override
+ public long size() {
return this.queue.size();
}
@@ -174,6 +175,7 @@ public class BoundedInMemoryQueue<I, O> implements
Iterable<O> {
*
* @param t Item to be queued
*/
+ @Override
public void insertRecord(I t) throws Exception {
// If already closed, throw exception
if (isWriteDone.get()) {
@@ -203,7 +205,8 @@ public class BoundedInMemoryQueue<I, O> implements
Iterable<O> {
* Reader interface but never exposed to outside world as this is a single
consumer queue. Reading is done through a
* singleton iterator for this queue.
*/
- private Option<O> readNextRecord() {
+ @Override
+ public Option<O> readNextRecord() {
if (this.isReadDone.get()) {
return Option.empty();
}
@@ -237,6 +240,7 @@ public class BoundedInMemoryQueue<I, O> implements
Iterable<O> {
/**
* Puts an empty entry to queue to denote termination.
*/
+ @Override
public void close() {
// done queueing records notifying queue-reader.
isWriteDone.set(true);
@@ -252,6 +256,7 @@ public class BoundedInMemoryQueue<I, O> implements
Iterable<O> {
/**
* API to allow producers and consumer to communicate termination due to
failure.
*/
+ @Override
public void markAsFailed(Throwable e) {
this.hasFailed.set(e);
// release the permits so that if the queueing thread is waiting for
permits then it will
@@ -259,6 +264,11 @@ public class BoundedInMemoryQueue<I, O> implements
Iterable<O> {
this.rateLimiter.release(RECORD_CACHING_LIMIT + 1);
}
+ @Override
+ public boolean isEmpty() {
+ return this.queue.size() == 0;
+ }
+
@Override
public Iterator<O> iterator() {
return iterator;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
new file mode 100644
index 0000000000..7ea5de07c0
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Executor which orchestrates concurrent producers and consumers
communicating through 'DisruptorMessageQueue'. This
+ * class takes as queue producer(s), consumer and transformer and exposes API
to orchestrate
+ * concurrent execution of these actors communicating through disruptor
+ */
+public class DisruptorExecutor<I, O, E> extends HoodieExecutorBase<I, O, E> {
+
+ private static final Logger LOG =
LogManager.getLogger(DisruptorExecutor.class);
+ private final HoodieMessageQueue<I, O> queue;
+
+ public DisruptorExecutor(final Option<Integer> bufferSize, final Iterator<I>
inputItr,
+ IteratorBasedQueueConsumer<O, E> consumer,
Function<I, O> transformFunction, Option<String> waitStrategy, Runnable
preExecuteRunnable) {
+ this(bufferSize, new IteratorBasedQueueProducer<>(inputItr),
Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable);
+ }
+
+ public DisruptorExecutor(final Option<Integer> bufferSize, HoodieProducer<I>
producer,
+ Option<IteratorBasedQueueConsumer<O, E>> consumer,
final Function<I, O> transformFunction, Option<String> waitStrategy, Runnable
preExecuteRunnable) {
+ this(bufferSize, Collections.singletonList(producer), consumer,
transformFunction, waitStrategy, preExecuteRunnable);
+ }
+
+ public DisruptorExecutor(final Option<Integer> bufferSize,
List<HoodieProducer<I>> producers,
+ Option<IteratorBasedQueueConsumer<O, E>> consumer,
final Function<I, O> transformFunction,
+ final Option<String> waitStrategy, Runnable
preExecuteRunnable) {
+ super(producers, consumer, preExecuteRunnable);
+ this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction,
waitStrategy, producers.size(), preExecuteRunnable);
+ }
+
+ /**
+ * Start all Producers.
+ */
+ @Override
+ public CompletableFuture<Void> startProducers() {
+ return CompletableFuture.allOf(producers.stream().map(producer -> {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ producer.produce(queue);
+ } catch (Throwable e) {
+ LOG.error("error producing records", e);
+ throw new HoodieException("Error producing records in disruptor
executor", e);
+ }
+ return true;
+ }, producerExecutorService);
+ }).toArray(CompletableFuture[]::new));
+ }
+
+ @Override
+ protected void setup() {
+ ((DisruptorMessageQueue)queue).setHandlers(consumer.get());
+ ((DisruptorMessageQueue)queue).start();
+ }
+
+ @Override
+ protected void postAction() {
+ try {
+ super.close();
+ queue.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Catch IOException while closing
DisruptorMessageQueue", e);
+ }
+ }
+
+ @Override
+ protected CompletableFuture<E> startConsumer() {
+ return producerFuture.thenApplyAsync(res -> {
+ try {
+ queue.close();
+ consumer.get().finish();
+ return consumer.get().getResult();
+ } catch (IOException e) {
+ throw new HoodieIOException("Catch Exception when closing", e);
+ }
+ }, consumerExecutorService);
+ }
+
+ @Override
+ public boolean isRemaining() {
+ return !queue.isEmpty();
+ }
+
+ @Override
+ public void shutdownNow() {
+ producerExecutorService.shutdownNow();
+ consumerExecutorService.shutdownNow();
+ try {
+ queue.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Catch IOException while closing
DisruptorMessageQueue");
+ }
+ }
+
+ @Override
+ public DisruptorMessageQueue<I, O> getQueue() {
+ return (DisruptorMessageQueue<I, O>)queue;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
new file mode 100644
index 0000000000..eccd881af1
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.hudi.common.util.CustomizedThreadFactory;
+import org.apache.hudi.common.util.Option;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+public class DisruptorMessageQueue<I, O> implements HoodieMessageQueue<I, O> {
+
+ private static final Logger LOG =
LogManager.getLogger(DisruptorMessageQueue.class);
+
+ private final Disruptor<HoodieDisruptorEvent> queue;
+ private final Function<I, O> transformFunction;
+ private final RingBuffer<HoodieDisruptorEvent> ringBuffer;
+ private final Lock closeLocker = new ReentrantLock();
+
+ private boolean isDisruptorClosed = false;
+
+ public DisruptorMessageQueue(Option<Integer> bufferSize, Function<I, O>
transformFunction, Option<String> waitStrategyName, int totalProducers,
Runnable preExecuteRunnable) {
+ WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName);
+ CustomizedThreadFactory threadFactory = new
CustomizedThreadFactory("disruptor", true, preExecuteRunnable);
+
+ this.queue = new Disruptor<>(new HoodieDisruptorEventFactory(),
bufferSize.get(), threadFactory, totalProducers > 1 ? ProducerType.MULTI :
ProducerType.SINGLE, waitStrategy);
+ this.ringBuffer = queue.getRingBuffer();
+ this.transformFunction = transformFunction;
+ }
+
+ @Override
+ public long size() {
+ return ringBuffer.getBufferSize() - ringBuffer.remainingCapacity();
+ }
+
+ @Override
+ public void insertRecord(I value) throws Exception {
+ O applied = transformFunction.apply(value);
+ EventTranslator<HoodieDisruptorEvent> translator = (event, sequence) ->
event.set(applied);
+ queue.getRingBuffer().publishEvent(translator);
+ }
+
+ @Override
+ public Option<O> readNextRecord() {
+ throw new UnsupportedOperationException("Should not call readNextRecord
here. And let DisruptorMessageHandler to handle consuming logic");
+ }
+
+ @Override
+ public void markAsFailed(Throwable e) {
+ // do nothing.
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity();
+ }
+
+ @Override
+ public void close() {
+ closeLocker.lock();
+ if (!isDisruptorClosed) {
+ queue.shutdown();
+ isDisruptorClosed = true;
+ }
+ closeLocker.unlock();
+ }
+
+ protected void setHandlers(IteratorBasedQueueConsumer consumer) {
+ queue.handleEventsWith(new EventHandler<HoodieDisruptorEvent>() {
+
+ @Override
+ public void onEvent(HoodieDisruptorEvent event, long sequence, boolean
endOfBatch) throws Exception {
+ consumer.consumeOneRecord(event.get());
+ }
+ });
+ }
+
+ protected void start() {
+ queue.start();
+ }
+
+ /**
+ * HoodieDisruptorEventFactory is used to create/preallocate
HoodieDisruptorEvent.
+ *
+ */
+ class HoodieDisruptorEventFactory implements
EventFactory<HoodieDisruptorEvent> {
+
+ @Override
+ public HoodieDisruptorEvent newInstance() {
+ return new HoodieDisruptorEvent();
+ }
+ }
+
+ /**
+ * The unit of data passed from producer to consumer in disruptor world.
+ */
+ class HoodieDisruptorEvent {
+
+ private O value;
+
+ public void set(O value) {
+ this.value = value;
+ }
+
+ public O get() {
+ return this.value;
+ }
+ }
+}
+
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java
new file mode 100644
index 0000000000..1a8e86835d
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public enum DisruptorWaitStrategyType {
+
+ /**
+ * The BlockingWaitStrategy is the slowest of the available wait strategies,
but is the most conservative with the respect to CPU usage
+ * and will give the most consistent behaviour across the widest variety of
deployment options.
+ */
+ BLOCKING_WAIT,
+
+ /**
+ * Like the `BlockingWaitStrategy` the `SleepingWaitStrategy` it attempts to
be conservative with CPU usage by using a simple busy wait loop.
+ * The difference is that the `SleepingWaitStrategy` uses a call to
`LockSupport.parkNanos(1)` in the middle of the loop.
+ * On a typical Linux system this will pause the thread for around 60µs.
+ */
+ SLEEPING_WAIT,
+
+ /**
+ * The `YieldingWaitStrategy` is one of two WaitStrategies that can be use
in low-latency systems.
+ * It is designed for cases where there is the option to burn CPU cycles
with the goal of improving latency.
+ * The `YieldingWaitStrategy` will busy spin, waiting for the sequence to
increment to the appropriate value.
+ * Inside the body of the loop `Thread#yield()` will be called allowing
other queued threads to run.
+ * This is the recommended wait strategy when you need very high
performance, and the number of `EventHandler` threads is lower than the total
number of logical cores,
+ * e.g. you have hyper-threading enabled.
+ */
+ YIELDING_WAIT,
+
+ /**
+ * The `BusySpinWaitStrategy` is the highest performing WaitStrategy.
+ * Like the `YieldingWaitStrategy`, it can be used in low-latency systems,
but puts the highest constraints on the deployment environment.
+ */
+ BUSY_SPIN_WAIT;
+
+ public static List<String> getNames() {
+ List<String> names = new ArrayList<>(KeyGeneratorType.values().length);
+ Arrays.stream(KeyGeneratorType.values())
+ .forEach(x -> names.add(x.name()));
+ return names;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
similarity index 50%
copy from
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
index c34842fbe3..05ecb1746c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java
@@ -18,44 +18,32 @@
package org.apache.hudi.common.util.queue;
-import java.util.Iterator;
+import org.apache.hudi.keygen.constant.KeyGeneratorType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
/**
- * Consume entries from queue and execute callback function.
+ * Types of {@link org.apache.hudi.common.util.queue.HoodieExecutor}.
*/
-public abstract class BoundedInMemoryQueueConsumer<I, O> {
-
- /**
- * API to de-queue entries to memory bounded queue.
- *
- * @param queue In Memory bounded queue
- */
- public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
- Iterator<I> iterator = queue.iterator();
-
- while (iterator.hasNext()) {
- consumeOneRecord(iterator.next());
- }
-
- // Notifies done
- finish();
-
- return getResult();
- }
-
- /**
- * Consumer One record.
- */
- protected abstract void consumeOneRecord(I record);
+public enum ExecutorType {
/**
- * Notifies implementation that we have exhausted consuming records from
queue.
+ * Executor which orchestrates concurrent producers and consumers
communicating through a bounded in-memory message queue using
LinkedBlockingQueue.
*/
- protected abstract void finish();
+ BOUNDED_IN_MEMORY,
/**
- * Return result of consuming records so far.
+ * Executor which orchestrates concurrent producers and consumers
communicating through disruptor as a lock free message queue
+ * to gain better writing performance. Although DisruptorExecutor is still
an experimental feature.
*/
- protected abstract O getResult();
+ DISRUPTOR;
+ public static List<String> getNames() {
+ List<String> names = new ArrayList<>(ExecutorType.values().length);
+ Arrays.stream(KeyGeneratorType.values())
+ .forEach(x -> names.add(x.name()));
+ return names;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
index 549683754c..df19915878 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
@@ -28,18 +28,18 @@ import java.util.function.Function;
*
* @param <I> Type of entry produced for queue
*/
-public class FunctionBasedQueueProducer<I> implements
BoundedInMemoryQueueProducer<I> {
+public class FunctionBasedQueueProducer<I> implements HoodieProducer<I> {
private static final Logger LOG =
LogManager.getLogger(FunctionBasedQueueProducer.class);
- private final Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction;
+ private final Function<HoodieMessageQueue<I, ?>, Boolean> producerFunction;
- public FunctionBasedQueueProducer(Function<BoundedInMemoryQueue<I, ?>,
Boolean> producerFunction) {
+ public FunctionBasedQueueProducer(Function<HoodieMessageQueue<I, ?>,
Boolean> producerFunction) {
this.producerFunction = producerFunction;
}
@Override
- public void produce(BoundedInMemoryQueue<I, ?> queue) {
+ public void produce(HoodieMessageQueue<I, ?> queue) {
LOG.info("starting function which will enqueue records");
producerFunction.apply(queue);
LOG.info("finished function which will enqueue records");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java
similarity index 69%
copy from
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java
index ecea9f2193..fb67fab6c7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java
@@ -19,16 +19,12 @@
package org.apache.hudi.common.util.queue;
/**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports
multiple producers single consumer pattern.
- *
- * @param <I> Input type for buffer items produced
+ * HoodieConsumer is used to consume records/messages from hoodie inner
message queue and write into DFS.
*/
-public interface BoundedInMemoryQueueProducer<I> {
+public interface HoodieConsumer<I, O> {
/**
- * API to enqueue entries to memory bounded queue.
- *
- * @param queue In Memory bounded queue
+ * Consume records from inner message queue.
*/
- void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+ O consume(HoodieMessageQueue<?, I> queue) throws Exception;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java
similarity index 63%
copy from
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java
index ecea9f2193..7d51441ede 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java
@@ -18,17 +18,27 @@
package org.apache.hudi.common.util.queue;
+import java.io.Closeable;
+
/**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports
multiple producers single consumer pattern.
- *
- * @param <I> Input type for buffer items produced
+ * HoodieExecutor which orchestrates concurrent producers and consumers
communicating through a bounded in message queue.
*/
-public interface BoundedInMemoryQueueProducer<I> {
+public interface HoodieExecutor<I, O, E> extends Closeable {
/**
- * API to enqueue entries to memory bounded queue.
- *
- * @param queue In Memory bounded queue
+ * Main API to
+ * 1. Set up and run all the production
+ * 2. Set up and run all the consumption.
+ * 3. Shutdown and return the result.
*/
- void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+ E execute();
+
+ boolean isRemaining();
+
+ /**
+ * Shutdown all the consumers and producers.
+ */
+ void shutdownNow();
+
+ boolean awaitTermination();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java
new file mode 100644
index 0000000000..8dc07e35e4
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import org.apache.hudi.common.util.CustomizedThreadFactory;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HoodieExecutorBase holds common elements producerExecutorService,
consumerExecutorService, producers and a single consumer.
+ * Also HoodieExecutorBase control the lifecycle of producerExecutorService
and consumerExecutorService.
+ */
+public abstract class HoodieExecutorBase<I, O, E> implements HoodieExecutor<I,
O, E> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieExecutorBase.class);
+
+ private static final long TERMINATE_WAITING_TIME_SECS = 60L;
+ // Executor service used for launching write thread.
+ protected final ExecutorService producerExecutorService;
+ // Executor service used for launching read thread.
+ protected final ExecutorService consumerExecutorService;
+ // Producers
+ protected final List<HoodieProducer<I>> producers;
+ // Consumer
+ protected final Option<IteratorBasedQueueConsumer<O, E>> consumer;
+ // pre-execute function to implement environment specific behavior before
executors (producers/consumer) run
+ protected final Runnable preExecuteRunnable;
+
+ CompletableFuture<Void> producerFuture;
+
+ public HoodieExecutorBase(List<HoodieProducer<I>> producers,
Option<IteratorBasedQueueConsumer<O, E>> consumer,
+ Runnable preExecuteRunnable) {
+ this.producers = producers;
+ this.consumer = consumer;
+ this.preExecuteRunnable = preExecuteRunnable;
+ // Ensure fixed thread for each producer thread
+ this.producerExecutorService =
Executors.newFixedThreadPool(producers.size(), new
CustomizedThreadFactory("executor-queue-producer", preExecuteRunnable));
+ // Ensure single thread for consumer
+ this.consumerExecutorService = Executors.newSingleThreadExecutor(new
CustomizedThreadFactory("executor-queue-consumer", preExecuteRunnable));
+ }
+
+ /**
+ * Start all Producers.
+ */
+ public abstract CompletableFuture<Void> startProducers();
+
+ /**
+ * Start consumer.
+ */
+ protected abstract CompletableFuture<E> startConsumer();
+
+ /**
+ * Closing/cleaning up the executor's resources after consuming finished.
+ */
+ protected abstract void postAction();
+
+ /**
+ * get bounded in message queue.
+ */
+ public abstract HoodieMessageQueue<I, O> getQueue();
+
+ /**
+ * set all the resources for current HoodieExecutor before start to produce
and consume records.
+ */
+ protected abstract void setup();
+
+ @Override
+ public boolean awaitTermination() {
+ // if current thread has been interrupted before awaitTermination was
called, we still give
+ // executor a chance to proceeding. So clear the interrupt flag and reset
it if needed before return.
+ boolean interruptedBefore = Thread.interrupted();
+ boolean producerTerminated = false;
+ boolean consumerTerminated = false;
+ try {
+ producerTerminated =
producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS,
TimeUnit.SECONDS);
+ consumerTerminated =
consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS,
TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ // fail silently for any other interruption
+ }
+ // reset interrupt flag if needed
+ if (interruptedBefore) {
+ Thread.currentThread().interrupt();
+ }
+ return producerTerminated && consumerTerminated;
+ }
+
+ @Override
+ public void close() {
+ if (!producerExecutorService.isShutdown()) {
+ producerExecutorService.shutdown();
+ }
+ if (!consumerExecutorService.isShutdown()) {
+ consumerExecutorService.shutdown();
+ }
+ }
+
+ /**
+ * Main API to run both production and consumption.
+ */
+ @Override
+ public E execute() {
+ try {
+ ValidationUtils.checkState(this.consumer.isPresent());
+ setup();
+ producerFuture = startProducers();
+ CompletableFuture<E> future = startConsumer();
+ return future.get();
+ } catch (InterruptedException ie) {
+ shutdownNow();
+ Thread.currentThread().interrupt();
+ throw new HoodieException(ie);
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ } finally {
+ postAction();
+ }
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java
similarity index 68%
copy from
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java
index ecea9f2193..71ef39f2c1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java
@@ -18,17 +18,12 @@
package org.apache.hudi.common.util.queue;
+import java.util.Iterator;
+
/**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports
multiple producers single consumer pattern.
- *
- * @param <I> Input type for buffer items produced
+ * IteratorBasedHoodieMessageQueue implements HoodieMessageQueue with Iterable
*/
-public interface BoundedInMemoryQueueProducer<I> {
+public abstract class HoodieIterableMessageQueue<I, O> implements
HoodieMessageQueue<I, O>, Iterable<O> {
- /**
- * API to enqueue entries to memory bounded queue.
- *
- * @param queue In Memory bounded queue
- */
- void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+ public abstract Iterator<O> iterator();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java
similarity index 53%
copy from
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java
index ecea9f2193..ae226f8adb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java
@@ -18,17 +18,36 @@
package org.apache.hudi.common.util.queue;
+import org.apache.hudi.common.util.Option;
+import java.io.Closeable;
+
/**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports
multiple producers single consumer pattern.
- *
- * @param <I> Input type for buffer items produced
+ * HoodieMessageQueue holds an internal message queue, and control the
behavior of
+ * 1. insert record into internal message queue.
+ * 2. get record from internal message queue.
+ * 3. close internal message queue.
*/
-public interface BoundedInMemoryQueueProducer<I> {
+public interface HoodieMessageQueue<I, O> extends Closeable {
+
+ /**
+ * Returns the number of elements in this queue.
+ */
+ long size();
+
+ /**
+ * Insert a record into inner message queue.
+ */
+ void insertRecord(I t) throws Exception;
+
+ /**
+ * Read records from inner message queue.
+ */
+ Option<O> readNextRecord();
/**
- * API to enqueue entries to memory bounded queue.
- *
- * @param queue In Memory bounded queue
+ * API to allow producers and consumer to communicate termination due to
failure.
*/
- void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+ void markAsFailed(Throwable e);
+
+ boolean isEmpty();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java
similarity index 76%
rename from
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java
index ecea9f2193..f56dd4cce9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java
@@ -19,16 +19,16 @@
package org.apache.hudi.common.util.queue;
/**
- * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports
multiple producers single consumer pattern.
+ * Producer for {@link HoodieMessageQueue}. Memory Bounded Buffer supports
multiple producers single consumer pattern.
*
* @param <I> Input type for buffer items produced
*/
-public interface BoundedInMemoryQueueProducer<I> {
+public interface HoodieProducer<I> {
/**
- * API to enqueue entries to memory bounded queue.
+ * API to enqueue entries to bounded queue.
*
* @param queue In Memory bounded queue
*/
- void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception;
+ void produce(HoodieMessageQueue<I, ?> queue) throws Exception;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java
similarity index 81%
rename from
hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java
index c34842fbe3..713d650464 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java
@@ -23,15 +23,17 @@ import java.util.Iterator;
/**
* Consume entries from queue and execute callback function.
*/
-public abstract class BoundedInMemoryQueueConsumer<I, O> {
+public abstract class IteratorBasedQueueConsumer<I, O> implements
HoodieConsumer<I, O> {
/**
* API to de-queue entries to memory bounded queue.
*
* @param queue In Memory bounded queue
*/
- public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
- Iterator<I> iterator = queue.iterator();
+ @Override
+ public O consume(HoodieMessageQueue<?, I> queue) throws Exception {
+
+ Iterator<I> iterator = ((HoodieIterableMessageQueue) queue).iterator();
while (iterator.hasNext()) {
consumeOneRecord(iterator.next());
@@ -46,12 +48,12 @@ public abstract class BoundedInMemoryQueueConsumer<I, O> {
/**
* Consumer One record.
*/
- protected abstract void consumeOneRecord(I record);
+ public abstract void consumeOneRecord(I record);
/**
* Notifies implementation that we have exhausted consuming records from
queue.
*/
- protected abstract void finish();
+ public void finish(){}
/**
* Return result of consuming records so far.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
index 3d11f38e5c..7904fd61eb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
@@ -28,7 +28,7 @@ import java.util.Iterator;
*
* @param <I> Item type produced for the buffer.
*/
-public class IteratorBasedQueueProducer<I> implements
BoundedInMemoryQueueProducer<I> {
+public class IteratorBasedQueueProducer<I> implements HoodieProducer<I> {
private static final Logger LOG =
LogManager.getLogger(IteratorBasedQueueProducer.class);
@@ -40,7 +40,7 @@ public class IteratorBasedQueueProducer<I> implements
BoundedInMemoryQueueProduc
}
@Override
- public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
+ public void produce(HoodieMessageQueue<I, ?> queue) throws Exception {
LOG.info("starting to buffer records");
while (inputIterator.hasNext()) {
queue.insertRecord(inputIterator.next());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
new file mode 100644
index 0000000000..8137d2a136
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import static
org.apache.hudi.common.util.queue.DisruptorWaitStrategyType.BLOCKING_WAIT;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.lmax.disruptor.SleepingWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+public class WaitStrategyFactory {
+
+ public static final String DEFAULT_STRATEGY = BLOCKING_WAIT.name();
+
+ /**
+ * Build WaitStrategy for disruptor
+ */
+ public static WaitStrategy build(Option<String> name) {
+ DisruptorWaitStrategyType strategyType = name.isPresent() ?
DisruptorWaitStrategyType.valueOf(name.get().toUpperCase()) : BLOCKING_WAIT;
+
+ switch (strategyType) {
+ case BLOCKING_WAIT:
+ return new BlockingWaitStrategy();
+ case SLEEPING_WAIT:
+ return new SleepingWaitStrategy();
+ case YIELDING_WAIT:
+ return new YieldingWaitStrategy();
+ case BUSY_SPIN_WAIT:
+ return new BusySpinWaitStrategy();
+ default:
+ throw new HoodieException("Unsupported Executor Type " + name);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index f44ec67e5e..2f70a10077 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
@@ -229,8 +229,8 @@ public class FormatUtils {
/**
* Setup log and parquet reading in parallel. Both write to central buffer.
*/
- private List<BoundedInMemoryQueueProducer<HoodieRecord<?>>>
getParallelProducers() {
- List<BoundedInMemoryQueueProducer<HoodieRecord<?>>> producers = new
ArrayList<>();
+ private List<HoodieProducer<HoodieRecord<?>>> getParallelProducers() {
+ List<HoodieProducer<HoodieRecord<?>>> producers = new ArrayList<>();
producers.add(new FunctionBasedQueueProducer<>(buffer -> {
scanner.scan();
return null;
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 84c8088650..700a87cbb7 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -24,8 +24,8 @@ import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
-import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
+import org.apache.hudi.common.util.queue.HoodieProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.hadoop.RecordReaderValueIterator;
import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper;
@@ -104,8 +104,8 @@ class RealtimeUnmergedRecordReader extends
AbstractRealtimeRecordReader
/**
* Setup log and parquet reading in parallel. Both write to central buffer.
*/
- private List<BoundedInMemoryQueueProducer<ArrayWritable>>
getParallelProducers() {
- List<BoundedInMemoryQueueProducer<ArrayWritable>> producers = new
ArrayList<>();
+ private List<HoodieProducer<ArrayWritable>> getParallelProducers() {
+ List<HoodieProducer<ArrayWritable>> producers = new ArrayList<>();
producers.add(new FunctionBasedQueueProducer<>(buffer -> {
logRecordScanner.scan();
return null;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
new file mode 100644
index 0000000000..b1d2517374
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, RowFactory, SaveMode, SparkSession}
+
+import scala.util.Random
+
+object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase {
+
+ protected val spark: SparkSession = getSparkSession
+
+ val recordNumber = 1000000
+
+ def getSparkSession: SparkSession = SparkSession.builder()
+ .master("local[*]")
+ .appName(this.getClass.getCanonicalName)
+ .withExtensions(new HoodieSparkSessionExtension)
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config("spark.sql.session.timeZone", "CTT")
+ .config(sparkConf())
+ .getOrCreate()
+
+ def sparkConf(): SparkConf = {
+ val sparkConf = new SparkConf()
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ sparkConf.set("spark.sql.catalog.spark_catalog",
+ "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+ }
+ sparkConf
+ }
+
+ private def createDataFrame(number: Int): DataFrame = {
+ val schema = new StructType()
+ .add("c1", IntegerType)
+ .add("c2", StringType)
+
+ val rdd = spark.sparkContext.parallelize(0 to number, 2).map { item =>
+ val c1 = Integer.valueOf(item)
+ val c2 = s"abc"
+ RowFactory.create(c1, c2)
+ }
+ spark.createDataFrame(rdd, schema)
+ }
+
+ /**
+ * OpenJDK 64-Bit Server VM 1.8.0_161-b14 on Linux 3.10.0-693.21.1.el7.x86_64
+ * Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
+ * COW Ingestion: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+ *
------------------------------------------------------------------------------------------------------------------------
+ * BoundInMemory Executor 5629 5765
192 0.2 5628.9 1.0X
+ * Disruptor Executor 2772 2862
127 0.4 2772.2 2.0X
+ *
+ */
+ private def cowTableDisruptorExecutorBenchmark(tableName: String =
"executorBenchmark"): Unit = {
+ val df = createDataFrame(recordNumber)
+ withTempDir {f =>
+ val benchmark = new HoodieBenchmark("COW Ingestion", recordNumber)
+ benchmark.addCase("BoundInMemory Executor") { _ =>
+ val finalTableName = tableName + Random.nextInt(10000)
+ df.write.format("hudi")
+ .mode(SaveMode.Overwrite)
+ .option("hoodie.datasource.write.recordkey.field", "c1")
+ .option("hoodie.datasource.write.partitionpath.field", "c2")
+ .option("hoodie.table.name", finalTableName)
+ .option("hoodie.metadata.enable", "false")
+ .option("hoodie.clean.automatic", "false")
+ .option("hoodie.bulkinsert.sort.mode", "NONE")
+ .option("hoodie.insert.shuffle.parallelism", "2")
+ .option("hoodie.datasource.write.operation", "bulk_insert")
+ .option("hoodie.datasource.write.row.writer.enable", "false")
+ .option("hoodie.bulkinsert.shuffle.parallelism", "1")
+ .option("hoodie.upsert.shuffle.parallelism", "2")
+ .option("hoodie.delete.shuffle.parallelism", "2")
+ .option("hoodie.populate.meta.fields", "false")
+ .option("hoodie.table.keygenerator.class",
"org.apache.hudi.keygen.SimpleKeyGenerator")
+ .save(new Path(f.getCanonicalPath, finalTableName).toUri.toString)
+ }
+
+ benchmark.addCase("Disruptor Executor") { _ =>
+ val finalTableName = tableName + Random.nextInt(10000)
+ df.write.format("hudi")
+ .mode(SaveMode.Overwrite)
+ .option("hoodie.datasource.write.recordkey.field", "c1")
+ .option("hoodie.datasource.write.partitionpath.field", "c2")
+ .option("hoodie.table.name", finalTableName)
+ .option("hoodie.metadata.enable", "false")
+ .option("hoodie.clean.automatic", "false")
+ .option("hoodie.bulkinsert.sort.mode", "NONE")
+ .option("hoodie.insert.shuffle.parallelism", "2")
+ .option("hoodie.datasource.write.operation", "bulk_insert")
+ .option("hoodie.datasource.write.row.writer.enable", "false")
+ .option("hoodie.bulkinsert.shuffle.parallelism", "1")
+ .option("hoodie.upsert.shuffle.parallelism", "2")
+ .option("hoodie.delete.shuffle.parallelism", "2")
+ .option("hoodie.write.executor.type", "DISRUPTOR")
+ .option("hoodie.populate.meta.fields", "false")
+ .option("hoodie.table.keygenerator.class",
"org.apache.hudi.keygen.SimpleKeyGenerator")
+
+ .save(new Path(f.getCanonicalPath, finalTableName).toUri.toString)
+ }
+ benchmark.run()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ spark.stop()
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ cowTableDisruptorExecutorBenchmark()
+ }
+}
diff --git a/pom.xml b/pom.xml
index e97647b119..3b02c916f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,7 @@
<trino.bundle.bootstrap.shade.prefix>org.apache.hudi.</trino.bundle.bootstrap.shade.prefix>
<shadeSources>true</shadeSources>
<zk-curator.version>2.7.1</zk-curator.version>
+ <disruptor.version>3.4.2</disruptor.version>
<antlr.version>4.7</antlr.version>
<aws.sdk.version>1.12.22</aws.sdk.version>
<proto.version>3.21.5</proto.version>