alexeykudinkin commented on code in PR #7174: URL: https://github.com/apache/hudi/pull/7174#discussion_r1046175646
########## hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +/** + * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock. + * Consuming and writing records from iterator directly. + * + * Compared with queue based Executor + * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. + * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. + */ +public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> { + + private static final Logger LOG = LogManager.getLogger(SimpleHoodieExecutor.class); + + // Consumer + protected final Option<HoodieConsumer<O, E>> consumer; + // records iterator + protected final Iterator<I> it; + private final Function<I, O> transformFunction; + private final Runnable preExecuteRunnable; Review Comment: We don't need this one (this one is only required for these that spin up new threads) ########## hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +/** + * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock. + * Consuming and writing records from iterator directly. + * + * Compared with queue based Executor + * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. + * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. + */ +public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> { + + private static final Logger LOG = LogManager.getLogger(SimpleHoodieExecutor.class); + + // Consumer + protected final Option<HoodieConsumer<O, E>> consumer; + // records iterator + protected final Iterator<I> it; + private final Function<I, O> transformFunction; + private final Runnable preExecuteRunnable; + private final AtomicBoolean isWriteDone = new AtomicBoolean(false); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + public SimpleHoodieExecutor(final Iterator<I> inputItr, HoodieConsumer<O, E> consumer, + Function<I, O> transformFunction, Runnable preExecuteRunnable) { + this(inputItr, Option.of(consumer), transformFunction, preExecuteRunnable); + } + + public SimpleHoodieExecutor(final Iterator<I> inputItr, Option<HoodieConsumer<O, E>> consumer, + Function<I, O> transformFunction, Runnable preExecuteRunnable) { + this.it = inputItr; + this.consumer = consumer; + this.transformFunction = transformFunction; + this.preExecuteRunnable = preExecuteRunnable; + } + + /** + * Consuming records from input iterator directly without any producers and inner message queue. + */ + @Override + public E execute() { + checkState(this.consumer.isPresent()); + + try { + LOG.info("Starting consumer, consuming records from the records iterator directly"); + preExecuteRunnable.run(); + while (it.hasNext()) { + if (isShutdown.get()) { Review Comment: I don't think we need this check since - We don't expect to shutdown executor in the middle of iteration (in the normal course) - In case of exception this would be interrupted naturally ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.HoodieConsumer; +import org.apache.hudi.common.util.queue.SimpleHoodieExecutor; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.Tuple2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { Review Comment: Yeah, that makes sense ########## hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +/** + * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock. + * Consuming and writing records from iterator directly. + * + * Compared with queue based Executor + * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. + * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. + */ +public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> { + + private static final Logger LOG = LogManager.getLogger(SimpleHoodieExecutor.class); + + // Consumer + protected final Option<HoodieConsumer<O, E>> consumer; + // records iterator + protected final Iterator<I> it; + private final Function<I, O> transformFunction; + private final Runnable preExecuteRunnable; + private final AtomicBoolean isWriteDone = new AtomicBoolean(false); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); Review Comment: There's actually no state for us to clean up so we can just kill these 2 ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala: ########## @@ -66,18 +66,18 @@ object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase { } /** - * OpenJDK 64-Bit Server VM 1.8.0_161-b14 on Linux 3.10.0-693.21.1.el7.x86_64 + * OpenJDK 64-Bit Server VM 1.8.0_342-b07 on Linux 5.10.62-55.141.amzn2.x86_64 * Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz * COW Ingestion: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative * ------------------------------------------------------------------------------------------------------------------------ - * BoundInMemory Executor 5629 5765 192 0.2 5628.9 1.0X - * Disruptor Executor 2772 2862 127 0.4 2772.2 2.0X - * + * BoundInMemory Executor 34661 35143 292 0.3 3466.1 1.0X + * Simple Executor 17347 17796 681 0.6 1734.7 2.0X + * Disruptor Executor 15803 16535 936 0.6 1580.3 2.2X Review Comment: Let's amend the commentary at the top that this benchmark has been run w/ unconstrained parallelism which obviously is beneficial to Disruptor more than it's for Simple ########## hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +/** + * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no inner message queue and no inner lock. + * Consuming and writing records from iterator directly. + * + * Compared with queue based Executor + * Advantages: there is no need for additional memory and cpu resources due to lock or multithreading. + * Disadvantages: lost some benefits such as speed limit. And maybe lower throughput. + */ +public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> { + + private static final Logger LOG = LogManager.getLogger(SimpleHoodieExecutor.class); + + // Consumer + protected final Option<HoodieConsumer<O, E>> consumer; + // records iterator + protected final Iterator<I> it; + private final Function<I, O> transformFunction; + private final Runnable preExecuteRunnable; + private final AtomicBoolean isWriteDone = new AtomicBoolean(false); + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + public SimpleHoodieExecutor(final Iterator<I> inputItr, HoodieConsumer<O, E> consumer, + Function<I, O> transformFunction, Runnable preExecuteRunnable) { + this(inputItr, Option.of(consumer), transformFunction, preExecuteRunnable); + } + + public SimpleHoodieExecutor(final Iterator<I> inputItr, Option<HoodieConsumer<O, E>> consumer, + Function<I, O> transformFunction, Runnable preExecuteRunnable) { + this.it = inputItr; + this.consumer = consumer; + this.transformFunction = transformFunction; + this.preExecuteRunnable = preExecuteRunnable; + } + + /** + * Consuming records from input iterator directly without any producers and inner message queue. + */ + @Override + public E execute() { + checkState(this.consumer.isPresent()); + + try { + LOG.info("Starting consumer, consuming records from the records iterator directly"); + preExecuteRunnable.run(); + while (it.hasNext()) { + if (isShutdown.get()) { + LOG.warn("Call shutdown while getting new entries, stop consuming."); + break; + } + O payload = transformFunction.apply(it.next()); + consumer.get().consume(payload); + } + + return consumer.get().finish(); + } catch (Exception e) { + LOG.error("Error consuming records in SimpleHoodieExecutor", e); + throw new HoodieException(e); + } finally { + isWriteDone.set(true); + } + } + + @Override + public void shutdownNow() { + isShutdown.set(true); + } + + @Override + public boolean awaitTermination() { + return isWriteDone.get(); + } + + public boolean isRunning() { Review Comment: I don't think we need this one (we only use it in test, and can actually avoid that assertion altogether, since we're asserting # of records) ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.HoodieConsumer; +import org.apache.hudi.common.util.queue.SimpleHoodieExecutor; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.Tuple2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestSimpleExecutionInSpark extends HoodieClientTestHarness { + + private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + + @BeforeEach + public void setUp() throws Exception { + initTestDataGenerator(); + initExecutorServiceWithFixedThreadPool(2); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + private Runnable getPreExecuteRunnable() { + final TaskContext taskContext = TaskContext.get(); + return () -> TaskContext$.MODULE$.setTaskContext(taskContext); + } + + @Test + public void testExecutor() { + + final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 128); + final List<HoodieRecord> consumedRecords = new ArrayList<>(); + + HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); + when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8)); + HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = + new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { + private int count = 0; + + @Override + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) throws Exception { + consumedRecords.add(record.record); + count++; + } + + @Override + public Integer finish() { + return count; + } + }; + SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null; + + try { + exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + + + int result = exec.execute(); + // It should buffer and write 100 records + assertEquals(128, result); + // There should be no remaining records in the buffer + assertFalse(exec.isRunning()); + + // collect all records and assert that consumed records are identical to produced ones + // assert there's no tampering, and that the ordering is preserved + assertEquals(hoodieRecords, consumedRecords); + for (int i = 0; i < hoodieRecords.size(); i++) { + assertEquals(hoodieRecords.get(i), consumedRecords.get(i)); + } + + } finally { + if (exec != null) { + exec.shutdownNow(); + } + } + } + + // Test to ensure that we are reading all records from queue iterator in the same order + // without any exceptions. + @SuppressWarnings("unchecked") + @Test + @Timeout(value = 60) + public void testRecordReading() { + + final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100); + ArrayList<HoodieRecord> beforeRecord = new ArrayList<>(); + ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>(); + ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>(); + ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>(); + + hoodieRecords.forEach(record -> { + final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record; + beforeRecord.add(originalRecord); + try { + final Option<IndexedRecord> originalInsertValue = + originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA); + beforeIndexedRecord.add(originalInsertValue.get()); + } catch (IOException e) { + // ignore exception here. + } + }); + + HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); + when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16)); + HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer> consumer = + new HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, Integer>() { + private int count = 0; + + @Override + public void consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> record) throws Exception { + count++; + afterRecord.add((HoodieAvroRecord) record.record); + try { + IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.record) + .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get(); + afterIndexedRecord.add(indexedRecord); + } catch (IOException e) { + //ignore exception here. + } + } + + @Override + public Integer finish() { + return count; + } + }; + + SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null; + + try { + exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); + int result = exec.execute(); + // It should buffer and write 100 records Review Comment: There's no buffering in here, let's update this one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
