This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b31d5f7a4409 refactor: rewrite executors tests to avoid code
duplication (#18005)
b31d5f7a4409 is described below
commit b31d5f7a44093afeb79142dcff373af857ac0c6d
Author: yaojiejia <[email protected]>
AuthorDate: Tue Mar 10 23:39:34 2026 -0400
refactor: rewrite executors tests to avoid code duplication (#18005)
---
.../hudi/execution/BaseExecutorTestHarness.java | 324 +++++++++++++++++++++
.../TestBoundedInMemoryExecutorInSpark.java | 136 +++------
.../execution/TestDisruptorExecutionInSpark.java | 137 ++-------
.../hudi/execution/TestSimpleExecutionInSpark.java | 196 +------------
4 files changed, 395 insertions(+), 398 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/BaseExecutorTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/BaseExecutorTestHarness.java
new file mode 100644
index 000000000000..70f1207d9e89
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/BaseExecutorTestHarness.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
+import org.apache.hudi.common.util.queue.BaseHoodieQueueBasedExecutor;
+import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base test harness for write executor tests.
+ * Provides common test logic to avoid duplication across different executor
implementations.
+ */
+public abstract class BaseExecutorTestHarness extends
HoodieSparkClientTestHarness {
+
+ protected final String instantTime =
InProcessTimeGenerator.createNewInstantTime();
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initTestDataGenerator();
+ if (requiresExecutorService()) {
+ initExecutorServiceWithFixedThreadPool(2);
+ }
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ /**
+ * Create an executor instance for testing.
+ *
+ * @param records iterator of records to process
+ * @param consumer consumer to process records
+ * @return executor instance
+ */
+ protected abstract HoodieExecutor<Integer> createExecutor(
+ Iterator<HoodieRecord> records, HoodieConsumer<HoodieRecord, Integer>
consumer);
+
+ /**
+ * Whether the executor requires setting up an executor service.
+ * @return true if executor service is needed
+ */
+ protected boolean requiresExecutorService() {
+ return true;
+ }
+
+ /**
+ * Whether the executor supports interrupt testing.
+ * @return true if interrupt test should be run
+ */
+ protected boolean supportsInterruptTest() {
+ return false;
+ }
+
+ /**
+ * Whether the executor supports running status check.
+ * @return true if running status check is supported
+ */
+ protected boolean supportsRunningStatusCheck() {
+ return false;
+ }
+
+ /**
+ * Get pre-execute runnable for Spark task context propagation.
+ * @return runnable to execute before consuming records
+ */
+ protected Runnable getPreExecuteRunnable() {
+ final TaskContext taskContext = TaskContext.get();
+ return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+ }
+
+ /**
+ * Create a simple counting consumer.
+ *
+ * @param consumedRecords list to store consumed records
+ * @param <T> record type
+ * @return consumer instance
+ */
+ protected <T> HoodieConsumer<T, Integer> createCountingConsumer(final
List<T> consumedRecords) {
+ return new HoodieConsumer<T, Integer>() {
+ private int count = 0;
+
+ @Override
+ public void consume(T record) throws Exception {
+ consumedRecords.add(record);
+ count++;
+ }
+
+ @Override
+ public Integer finish() {
+ return count;
+ }
+ };
+ }
+
+ /**
+ * Create a consumer that waits indefinitely (for interrupt testing).
+ *
+ * @param <T> record type
+ * @return consumer instance
+ */
+ protected <T> HoodieConsumer<T, Integer> createWaitingConsumer() {
+ return new HoodieConsumer<T, Integer>() {
+ @Override
+ public void consume(T record) {
+ try {
+ synchronized (this) {
+ wait();
+ }
+ } catch (InterruptedException ie) {
+ // Expected for interrupt tests
+ }
+ }
+
+ @Override
+ public Integer finish() {
+ return 0;
+ }
+ };
+ }
+
+ @Test
+ public void testExecutor() {
+ final int numRecords = 128;
+ final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, numRecords);
+ final List<HoodieRecord> consumedRecords = new ArrayList<>();
+
+ HoodieConsumer<HoodieRecord, Integer> consumer =
createCountingConsumer(consumedRecords);
+ HoodieExecutor<Integer> exec = null;
+
+ try {
+ exec = createExecutor(hoodieRecords.iterator(), consumer);
+ int result = exec.execute();
+
+ // Verify all records were processed
+ assertEquals(numRecords, result);
+
+ // Verify consumed records are identical to produced ones and ordering
is preserved
+ assertEquals(hoodieRecords, consumedRecords);
+
+ // Check running status if supported
+ if (supportsRunningStatusCheck()) {
+ BaseHoodieQueueBasedExecutor queueExec =
(BaseHoodieQueueBasedExecutor) exec;
+ assertFalse(queueExec.isRunning());
+ }
+
+ } finally {
+ if (exec != null) {
+ exec.shutdownNow();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(value = 60)
+ public void testRecordReading() {
+ final int numRecords = 100;
+ final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, numRecords);
+ ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+ ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+ ArrayList<HoodieRecord> afterRecord = new ArrayList<>();
+ ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+ hoodieRecords.forEach(record -> {
+ beforeRecord.add(record);
+ beforeIndexedRecord.add((IndexedRecord) record.getData());
+ });
+
+ HoodieConsumer<HoodieRecord, Integer> consumer =
+ new HoodieConsumer<HoodieRecord, Integer>() {
+ private int count = 0;
+
+ @Override
+ public void consume(HoodieRecord record) {
+ count++;
+ afterRecord.add(record);
+ afterIndexedRecord.add((IndexedRecord) record.getData());
+ }
+
+ @Override
+ public Integer finish() {
+ return count;
+ }
+ };
+
+ HoodieExecutor<Integer> exec = null;
+
+ try {
+ exec = createExecutor(hoodieRecords.iterator(), consumer);
+ int result = exec.execute();
+ assertEquals(numRecords, result);
+
+ assertEquals(beforeRecord, afterRecord);
+ assertEquals(beforeIndexedRecord, afterIndexedRecord);
+
+ } finally {
+ if (exec != null) {
+ exec.shutdownNow();
+ }
+ }
+ }
+
+ @Test
+ @Timeout(value = 60)
+ public void testException() {
+ final int numRecords = 1000;
+ final String errorMessage = "Exception when iterating records!!!";
+
+ List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime,
numRecords);
+ Iterator<HoodieRecord> iterator = new ThrowingIterator(pRecs.iterator(),
errorMessage, numRecords / 10);
+
+ HoodieConsumer<HoodieRecord, Integer> consumer =
+ new HoodieConsumer<HoodieRecord, Integer>() {
+ int count = 0;
+
+ @Override
+ public void consume(HoodieRecord payload) throws Exception {
+ count++;
+ }
+
+ @Override
+ public Integer finish() {
+ return count;
+ }
+ };
+
+ HoodieExecutor<Integer> exec = createExecutor(iterator, consumer);
+
+ final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
+ "exception is expected");
+ assertTrue(thrown.getMessage().contains(errorMessage));
+ }
+
+ @Test
+ @Timeout(value = 60)
+ public void testInterruptExecutor() {
+ if (!supportsInterruptTest()) {
+ return;
+ }
+
+ final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 100);
+ HoodieConsumer<HoodieRecord, Integer> consumer = createWaitingConsumer();
+
+ HoodieExecutor<Integer> executor =
createExecutor(hoodieRecords.iterator(), consumer);
+
+ try {
+ Thread.currentThread().interrupt();
+ assertThrows(HoodieException.class, executor::execute);
+ assertTrue(Thread.interrupted());
+ } catch (Exception e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Iterator that throws an exception after a certain number of elements.
+ */
+ protected static class ThrowingIterator implements Iterator<HoodieRecord> {
+ private final Iterator<HoodieRecord> iterator;
+ private final AtomicInteger count = new AtomicInteger(0);
+ private final String errorMessage;
+ private final int errorMessageCount;
+
+ public ThrowingIterator(Iterator<HoodieRecord> iterator, String
errorMessage, int errorMessageCount) {
+ this.iterator = iterator;
+ this.errorMessage = errorMessage;
+ this.errorMessageCount = errorMessageCount;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public HoodieRecord next() {
+ if (count.get() == errorMessageCount) {
+ throw new HoodieException(errorMessage);
+ }
+ count.incrementAndGet();
+ return iterator.next();
+ }
+ }
+}
+
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 0ff12fe19e89..26b21661dbf0 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -19,71 +19,44 @@
package org.apache.hudi.execution;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
-
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.spark.TaskContext;
-import org.apache.spark.TaskContext$;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+
import org.junit.jupiter.api.Test;
import java.util.Iterator;
-import java.util.List;
-import scala.Tuple2;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.HOODIE_SCHEMA;
import static
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformerInternal;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TestBoundedInMemoryExecutorInSpark extends
HoodieSparkClientTestHarness {
-
- private final String instantTime =
InProcessTimeGenerator.createNewInstantTime();
+/**
+ * Tests for {@link BoundedInMemoryExecutor}.
+ */
+public class TestBoundedInMemoryExecutorInSpark extends
BaseExecutorTestHarness {
private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withExecutorType(ExecutorType.BOUNDED_IN_MEMORY.name())
.withWriteBufferLimitBytes(1024)
.build(false);
- @BeforeEach
- public void setUp() throws Exception {
- initTestDataGenerator();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupResources();
- }
-
- private Runnable getPreExecuteRunnable() {
- final TaskContext taskContext = TaskContext.get();
- return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
- }
-
- @Test
- public void testExecutor() {
-
- final int recordNumber = 100;
- final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, recordNumber);
-
-
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
+ @Override
+ protected HoodieExecutor<Integer> createExecutor(
+ Iterator<HoodieRecord> records, HoodieConsumer<HoodieRecord, Integer>
consumer) {
+ // BoundedInMemoryExecutor with getTransformerInternal produces
HoodieInsertValueGenResult
+ @SuppressWarnings("rawtypes")
+
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> wrappedConsumer =
new
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
-
private int count = 0;
@Override
- public void
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
+ public void
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
result) throws Exception {
+ // Extract the HoodieRecord from the result and pass to original
consumer
+ consumer.consume(result.getResult());
count++;
}
@@ -93,62 +66,30 @@ public class TestBoundedInMemoryExecutorInSpark extends
HoodieSparkClientTestHar
}
};
- BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>, Integer> executor = null;
- try {
- executor = new
BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(),
hoodieRecords.iterator(), consumer,
- getTransformerInternal(HOODIE_SCHEMA, writeConfig),
getPreExecuteRunnable());
- int result = executor.execute();
-
- assertEquals(100, result);
- // There should be no remaining records in the buffer
- assertFalse(executor.isRunning());
- } finally {
- if (executor != null) {
- executor.shutdownNow();
- executor.awaitTermination();
- }
- }
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ BoundedInMemoryExecutor executor = new BoundedInMemoryExecutor(
+ writeConfig.getWriteBufferLimitBytes(),
+ records,
+ wrappedConsumer,
+ getTransformerInternal(HOODIE_SCHEMA, writeConfig),
+ getPreExecuteRunnable());
+
+ return executor;
}
- @Test
- public void testInterruptExecutor() {
- final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 100);
-
-
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
- new
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
-
- @Override
- public void
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>
record) {
- try {
- while (true) {
- Thread.sleep(1000);
- }
- } catch (InterruptedException ie) {
- return;
- }
- }
-
- @Override
- public Integer finish() {
- return 0;
- }
- };
-
- BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>, Integer> executor =
- new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(),
hoodieRecords.iterator(), consumer,
- getTransformerInternal(HOODIE_SCHEMA, writeConfig),
getPreExecuteRunnable());
-
- // Interrupt the current thread (therefore triggering executor to throw as
soon as it
- // invokes [[get]] on the [[CompletableFuture]])
- Thread.currentThread().interrupt();
-
- assertThrows(HoodieException.class, executor::execute);
+ @Override
+ protected boolean requiresExecutorService() {
+ return false;
+ }
- // Validate that interrupted flag is reset, after [[InterruptedException]]
is thrown
- assertTrue(Thread.interrupted());
+ @Override
+ protected boolean supportsInterruptTest() {
+ return true;
+ }
- executor.shutdownNow();
- executor.awaitTermination();
+ @Override
+ protected boolean supportsRunningStatusCheck() {
+ return true;
}
@Test
@@ -165,6 +106,7 @@ public class TestBoundedInMemoryExecutorInSpark extends
HoodieSparkClientTestHar
}
};
+ @SuppressWarnings("rawtypes")
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer> consumer =
new
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
Integer>() {
@Override
@@ -177,8 +119,10 @@ public class TestBoundedInMemoryExecutorInSpark extends
HoodieSparkClientTestHar
}
};
- BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord,
Option<IndexedRecord>>, Integer> executor =
- new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(),
unboundedRecordIter,
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ BoundedInMemoryExecutor executor =
+ new BoundedInMemoryExecutor(
+ writeConfig.getWriteBufferLimitBytes(), unboundedRecordIter,
consumer, getTransformerInternal(HOODIE_SCHEMA, writeConfig),
getPreExecuteRunnable());
executor.shutdownNow();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
index 49e25c5db700..5cefabb8f7a8 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
@@ -19,138 +19,45 @@
package org.apache.hudi.execution;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.common.util.queue.WaitStrategyFactory;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
-import org.apache.spark.TaskContext;
-import org.apache.spark.TaskContext$;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Iterator;
import java.util.function.Function;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestDisruptorExecutionInSpark extends
HoodieSparkClientTestHarness {
-
- private final String instantTime =
InProcessTimeGenerator.createNewInstantTime();
+/**
+ * Tests for {@link DisruptorExecutor}.
+ */
+public class TestDisruptorExecutionInSpark extends BaseExecutorTestHarness {
private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withExecutorType(ExecutorType.DISRUPTOR.name())
.withWriteExecutorDisruptorWriteBufferLimitBytes(8)
.build(false);
- @BeforeEach
- public void setUp() throws Exception {
- initTestDataGenerator();
- initExecutorServiceWithFixedThreadPool(2);
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupResources();
+ @Override
+ protected HoodieExecutor<Integer> createExecutor(
+ Iterator<HoodieRecord> records, HoodieConsumer<HoodieRecord, Integer>
consumer) {
+ return new DisruptorExecutor<>(
+ writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(),
+ records,
+ consumer,
+ Function.identity(),
+ WaitStrategyFactory.DEFAULT_STRATEGY,
+ getPreExecuteRunnable());
}
- private Runnable getPreExecuteRunnable() {
- final TaskContext taskContext = TaskContext.get();
- return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+ @Override
+ protected boolean supportsInterruptTest() {
+ return true;
}
- @Test
- public void testExecutor() {
-
- final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 128);
- final List<HoodieRecord> consumedRecords = new ArrayList<>();
-
- HoodieConsumer<HoodieRecord, Integer> consumer =
- new HoodieConsumer<HoodieRecord, Integer>() {
-
- private int count = 0;
-
- @Override
- public void consume(HoodieRecord record) {
- consumedRecords.add(record);
- count++;
- }
-
- @Override
- public Integer finish() {
- return count;
- }
- };
- DisruptorExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
-
- try {
- exec = new
DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(),
hoodieRecords.iterator(), consumer,
- Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY,
getPreExecuteRunnable());
- int result = exec.execute();
- // It should buffer and write 100 records
- assertEquals(128, result);
- // There should be no remaining records in the buffer
- assertFalse(exec.isRunning());
-
- // collect all records and assert that consumed records are identical to
produced ones
- // assert there's no tampering, and that the ordering is preserved
- assertEquals(hoodieRecords, consumedRecords);
- for (int i = 0; i < hoodieRecords.size(); i++) {
- assertEquals(hoodieRecords.get(i), consumedRecords.get(i));
- }
-
- } finally {
- if (exec != null) {
- exec.shutdownNow();
- }
- }
- }
-
- @Test
- @Timeout(value = 60)
- public void testInterruptExecutor() {
- final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 100);
-
- HoodieConsumer<HoodieRecord, Integer> consumer =
- new HoodieConsumer<HoodieRecord, Integer>() {
-
- @Override
- public void consume(HoodieRecord record) {
- try {
- synchronized (this) {
- wait();
- }
- } catch (InterruptedException ie) {
- // ignore here
- }
- }
-
- @Override
- public Integer finish() {
- return 0;
- }
- };
-
- DisruptorExecutor<HoodieRecord, HoodieRecord, Integer>
- executor = new DisruptorExecutor<>(1024, hoodieRecords.iterator(),
consumer,
- Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY,
getPreExecuteRunnable());
-
- try {
- Thread.currentThread().interrupt();
- assertThrows(HoodieException.class, executor::execute);
- assertTrue(Thread.interrupted());
- } catch (Exception e) {
- // ignore here
- }
+ @Override
+ protected boolean supportsRunningStatusCheck() {
+ return true;
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
index 56e56a15b50f..77b512f3aaf6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java
@@ -19,199 +19,21 @@
package org.apache.hudi.execution;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.common.util.queue.SimpleExecutor;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
-import org.apache.avro.generic.IndexedRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestSimpleExecutionInSpark extends HoodieSparkClientTestHarness {
-
- private final String instantTime =
InProcessTimeGenerator.createNewInstantTime();
-
- @BeforeEach
- public void setUp() throws Exception {
- initTestDataGenerator();
- initExecutorServiceWithFixedThreadPool(2);
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupResources();
- }
-
- @Test
- public void testExecutor() {
-
- final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 128);
- final List<HoodieRecord> consumedRecords = new ArrayList<>();
-
- HoodieConsumer<HoodieRecord, Integer> consumer =
- new HoodieConsumer<HoodieRecord, Integer>() {
- private int count = 0;
-
- @Override
- public void consume(HoodieRecord record) throws Exception {
- consumedRecords.add(record);
- count++;
- }
-
- @Override
- public Integer finish() {
- return count;
- }
- };
- SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
-
- try {
- exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer,
Function.identity());
-
- int result = exec.execute();
- // It should buffer and write 128 records
- assertEquals(128, result);
-
- // collect all records and assert that consumed records are identical to
produced ones
- // assert there's no tampering, and that the ordering is preserved
- assertEquals(hoodieRecords, consumedRecords);
-
- } finally {
- if (exec != null) {
- exec.shutdownNow();
- }
- }
- }
-
- // Test to ensure that we are reading all records from queue iterator in the
same order
- // without any exceptions.
- @SuppressWarnings("unchecked")
- @Test
- @Timeout(value = 60)
- public void testRecordReading() {
-
- final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 100);
- ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
- ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
- ArrayList<HoodieRecord> afterRecord = new ArrayList<>();
- ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
-
- hoodieRecords.forEach(record -> {
- beforeRecord.add(record);
- beforeIndexedRecord.add((IndexedRecord) record.getData());
- });
-
- HoodieConsumer<HoodieRecord, Integer> consumer =
- new HoodieConsumer<HoodieRecord, Integer>() {
- private int count = 0;
-
- @Override
- public void consume(HoodieRecord record) {
- count++;
- afterRecord.add(record);
- afterIndexedRecord.add((IndexedRecord) record.getData());
- }
-
- @Override
- public Integer finish() {
- return count;
- }
- };
-
- SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
-
- try {
- exec = new SimpleExecutor<>(hoodieRecords.iterator(), consumer,
Function.identity());
- int result = exec.execute();
- assertEquals(100, result);
-
- assertEquals(beforeRecord, afterRecord);
- assertEquals(beforeIndexedRecord, afterIndexedRecord);
-
- } finally {
- if (exec != null) {
- exec.shutdownNow();
- }
- }
- }
-
- /**
- * Test to ensure exception happen in iterator then we need to stop the
simple ingestion.
- */
- @SuppressWarnings("unchecked")
- @Test
- @Timeout(value = 60)
- public void testException() {
- final int numRecords = 1000;
- final String errorMessage = "Exception when iterating records!!!";
-
- List<HoodieRecord> pRecs = dataGen.generateInserts(instantTime,
numRecords);
- InnerIterator iterator = new InnerIterator(pRecs.iterator(), errorMessage,
numRecords / 10);
-
- HoodieConsumer<HoodieRecord, Integer> consumer =
- new HoodieConsumer<HoodieRecord, Integer>() {
- int count = 0;
-
- @Override
- public void consume(HoodieRecord payload) throws Exception {
- // Read recs and ensure we have covered all producer recs.
- count++;
- }
-
- @Override
- public Integer finish() {
- return count;
- }
- };
-
- SimpleExecutor<HoodieRecord, HoodieRecord, Integer> exec =
- new SimpleExecutor<>(iterator, consumer, Function.identity());
-
- final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
- "exception is expected");
- assertTrue(thrown.getMessage().contains(errorMessage));
- }
-
- class InnerIterator implements Iterator<HoodieRecord> {
-
- private Iterator<HoodieRecord> iterator;
- private AtomicInteger count = new AtomicInteger(0);
- private String errorMessage;
- private int errorMessageCount;
-
- public InnerIterator(Iterator<HoodieRecord> iterator, String errorMessage,
int errorMessageCount) {
- this.iterator = iterator;
- this.errorMessage = errorMessage;
- this.errorMessageCount = errorMessageCount;
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public HoodieRecord next() {
- if (count.get() == errorMessageCount) {
- throw new HoodieException(errorMessage);
- }
+/**
+ * Tests for {@link SimpleExecutor}.
+ */
+public class TestSimpleExecutionInSpark extends BaseExecutorTestHarness {
- count.incrementAndGet();
- return iterator.next();
- }
+ @Override
+ protected HoodieExecutor<Integer> createExecutor(
+ Iterator<HoodieRecord> records, HoodieConsumer<HoodieRecord, Integer>
consumer) {
+ return new SimpleExecutor<>(records, consumer, Function.identity());
}
}