alexeykudinkin commented on code in PR #7174:
URL: https://github.com/apache/hudi/pull/7174#discussion_r1046175646


##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/**
+ * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no 
inner message queue and no inner lock.
+ * Consuming and writing records from iterator directly.
+ *
+ * Compared with queue based Executor
+ * Advantages: there is no need for additional memory and cpu resources due to 
lock or multithreading.
+ * Disadvantages: lost some benefits such as speed limit. And maybe lower 
throughput.
+ */
+public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleHoodieExecutor.class);
+
+  // Consumer
+  protected final Option<HoodieConsumer<O, E>> consumer;
+  // records iterator
+  protected final Iterator<I> it;
+  private final Function<I, O> transformFunction;
+  private final Runnable preExecuteRunnable;

Review Comment:
   We don't need this one (this one is only required for these that spin up new 
threads)



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/**
+ * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no 
inner message queue and no inner lock.
+ * Consuming and writing records from iterator directly.
+ *
+ * Compared with queue based Executor
+ * Advantages: there is no need for additional memory and cpu resources due to 
lock or multithreading.
+ * Disadvantages: lost some benefits such as speed limit. And maybe lower 
throughput.
+ */
+public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleHoodieExecutor.class);
+
+  // Consumer
+  protected final Option<HoodieConsumer<O, E>> consumer;
+  // records iterator
+  protected final Iterator<I> it;
+  private final Function<I, O> transformFunction;
+  private final Runnable preExecuteRunnable;
+  private final AtomicBoolean isWriteDone = new AtomicBoolean(false);
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  public SimpleHoodieExecutor(final Iterator<I> inputItr, HoodieConsumer<O, E> 
consumer,
+                              Function<I, O> transformFunction, Runnable 
preExecuteRunnable) {
+    this(inputItr, Option.of(consumer), transformFunction, preExecuteRunnable);
+  }
+
+  public SimpleHoodieExecutor(final Iterator<I> inputItr, 
Option<HoodieConsumer<O, E>> consumer,
+                              Function<I, O> transformFunction, Runnable 
preExecuteRunnable) {
+    this.it = inputItr;
+    this.consumer = consumer;
+    this.transformFunction = transformFunction;
+    this.preExecuteRunnable = preExecuteRunnable;
+  }
+
+  /**
+   * Consuming records from input iterator directly without any producers and 
inner message queue.
+   */
+  @Override
+  public E execute() {
+    checkState(this.consumer.isPresent());
+
+    try {
+      LOG.info("Starting consumer, consuming records from the records iterator 
directly");
+      preExecuteRunnable.run();
+      while (it.hasNext()) {
+        if (isShutdown.get()) {

Review Comment:
   I don't think we need this check since 
    - We don't expect to shutdown executor in the middle of iteration (in the 
normal course)
    - In case of exception this would be interrupted naturally



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.SimpleHoodieExecutor;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestSimpleExecutionInSpark extends HoodieClientTestHarness {

Review Comment:
   Yeah, that makes sense



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/**
+ * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no 
inner message queue and no inner lock.
+ * Consuming and writing records from iterator directly.
+ *
+ * Compared with queue based Executor
+ * Advantages: there is no need for additional memory and cpu resources due to 
lock or multithreading.
+ * Disadvantages: lost some benefits such as speed limit. And maybe lower 
throughput.
+ */
+public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleHoodieExecutor.class);
+
+  // Consumer
+  protected final Option<HoodieConsumer<O, E>> consumer;
+  // records iterator
+  protected final Iterator<I> it;
+  private final Function<I, O> transformFunction;
+  private final Runnable preExecuteRunnable;
+  private final AtomicBoolean isWriteDone = new AtomicBoolean(false);
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);

Review Comment:
   There's actually no state for us to clean up so we can just kill these 2



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala:
##########
@@ -66,18 +66,18 @@ object BoundInMemoryExecutorBenchmark extends 
HoodieBenchmarkBase {
   }
 
   /**
-   * OpenJDK 64-Bit Server VM 1.8.0_161-b14 on Linux 3.10.0-693.21.1.el7.x86_64
+   * OpenJDK 64-Bit Server VM 1.8.0_342-b07 on Linux 
5.10.62-55.141.amzn2.x86_64
    * Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz
    * COW Ingestion:                            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    * 
------------------------------------------------------------------------------------------------------------------------
-   * BoundInMemory Executor                             5629           5765    
     192          0.2        5628.9       1.0X
-   * Disruptor Executor                                 2772           2862    
     127          0.4        2772.2       2.0X
-   *
+   * BoundInMemory Executor                            34661          35143    
     292          0.3        3466.1       1.0X
+   * Simple Executor                                   17347          17796    
     681          0.6        1734.7       2.0X
+   * Disruptor Executor                                15803          16535    
     936          0.6        1580.3       2.2X

Review Comment:
   Let's amend the commentary at the top that this benchmark has been run w/ 
unconstrained parallelism which obviously is beneficial to Disruptor more than 
it's for Simple



##########
hudi-common/src/main/java/org/apache/hudi/common/util/queue/SimpleHoodieExecutor.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.queue;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+/**
+ * Single Writer and Single Reader mode. Also this SimpleHoodieExecutor has no 
inner message queue and no inner lock.
+ * Consuming and writing records from iterator directly.
+ *
+ * Compared with queue based Executor
+ * Advantages: there is no need for additional memory and cpu resources due to 
lock or multithreading.
+ * Disadvantages: lost some benefits such as speed limit. And maybe lower 
throughput.
+ */
+public class SimpleHoodieExecutor<I, O, E> implements HoodieExecutor<E> {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleHoodieExecutor.class);
+
+  // Consumer
+  protected final Option<HoodieConsumer<O, E>> consumer;
+  // records iterator
+  protected final Iterator<I> it;
+  private final Function<I, O> transformFunction;
+  private final Runnable preExecuteRunnable;
+  private final AtomicBoolean isWriteDone = new AtomicBoolean(false);
+  private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+  public SimpleHoodieExecutor(final Iterator<I> inputItr, HoodieConsumer<O, E> 
consumer,
+                              Function<I, O> transformFunction, Runnable 
preExecuteRunnable) {
+    this(inputItr, Option.of(consumer), transformFunction, preExecuteRunnable);
+  }
+
+  public SimpleHoodieExecutor(final Iterator<I> inputItr, 
Option<HoodieConsumer<O, E>> consumer,
+                              Function<I, O> transformFunction, Runnable 
preExecuteRunnable) {
+    this.it = inputItr;
+    this.consumer = consumer;
+    this.transformFunction = transformFunction;
+    this.preExecuteRunnable = preExecuteRunnable;
+  }
+
+  /**
+   * Consuming records from input iterator directly without any producers and 
inner message queue.
+   */
+  @Override
+  public E execute() {
+    checkState(this.consumer.isPresent());
+
+    try {
+      LOG.info("Starting consumer, consuming records from the records iterator 
directly");
+      preExecuteRunnable.run();
+      while (it.hasNext()) {
+        if (isShutdown.get()) {
+          LOG.warn("Call shutdown while getting new entries, stop consuming.");
+          break;
+        }
+        O payload = transformFunction.apply(it.next());
+        consumer.get().consume(payload);
+      }
+
+      return consumer.get().finish();
+    } catch (Exception e) {
+      LOG.error("Error consuming records in SimpleHoodieExecutor", e);
+      throw new HoodieException(e);
+    } finally {
+      isWriteDone.set(true);
+    }
+  }
+
+  @Override
+  public void shutdownNow() {
+    isShutdown.set(true);
+  }
+
+  @Override
+  public boolean awaitTermination() {
+    return isWriteDone.get();
+  }
+
+  public boolean isRunning() {

Review Comment:
   I don't think we need this one (we only use it in test, and can actually 
avoid that assertion altogether, since we're asserting # of records)



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestSimpleExecutionInSpark.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import static 
org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.queue.HoodieConsumer;
+import org.apache.hudi.common.util.queue.SimpleHoodieExecutor;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestSimpleExecutionInSpark extends HoodieClientTestHarness {
+
+  private final String instantTime = 
HoodieActiveTimeline.createNewInstantTime();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initTestDataGenerator();
+    initExecutorServiceWithFixedThreadPool(2);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private Runnable getPreExecuteRunnable() {
+    final TaskContext taskContext = TaskContext.get();
+    return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
+  }
+
+  @Test
+  public void testExecutor() {
+
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 128);
+    final List<HoodieRecord> consumedRecords = new ArrayList<>();
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8));
+    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+          private int count = 0;
+
+          @Override
+          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
record) throws Exception {
+            consumedRecords.add(record.record);
+            count++;
+          }
+
+          @Override
+          public Integer finish() {
+            return count;
+          }
+        };
+    SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), 
getPreExecuteRunnable());
+
+
+      int result = exec.execute();
+      // It should buffer and write 100 records
+      assertEquals(128, result);
+      // There should be no remaining records in the buffer
+      assertFalse(exec.isRunning());
+
+      // collect all records and assert that consumed records are identical to 
produced ones
+      // assert there's no tampering, and that the ordering is preserved
+      assertEquals(hoodieRecords, consumedRecords);
+      for (int i = 0; i < hoodieRecords.size(); i++) {
+        assertEquals(hoodieRecords.get(i), consumedRecords.get(i));
+      }
+
+    } finally {
+      if (exec != null) {
+        exec.shutdownNow();
+      }
+    }
+  }
+
+  // Test to ensure that we are reading all records from queue iterator in the 
same order
+  // without any exceptions.
+  @SuppressWarnings("unchecked")
+  @Test
+  @Timeout(value = 60)
+  public void testRecordReading() {
+
+    final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, 100);
+    ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> beforeIndexedRecord = new ArrayList<>();
+    ArrayList<HoodieAvroRecord> afterRecord = new ArrayList<>();
+    ArrayList<IndexedRecord> afterIndexedRecord = new ArrayList<>();
+
+    hoodieRecords.forEach(record -> {
+      final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record;
+      beforeRecord.add(originalRecord);
+      try {
+        final Option<IndexedRecord> originalInsertValue =
+            
originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
+        beforeIndexedRecord.add(originalInsertValue.get());
+      } catch (IOException e) {
+        // ignore exception here.
+      }
+    });
+
+    HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
+    
when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16));
+    
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer> consumer =
+        new 
HoodieConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>,
 Integer>() {
+          private int count = 0;
+
+          @Override
+          public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
record) throws Exception {
+            count++;
+            afterRecord.add((HoodieAvroRecord) record.record);
+            try {
+              IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) 
record.record)
+                  
.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+              afterIndexedRecord.add(indexedRecord);
+            } catch (IOException e) {
+              //ignore exception here.
+            }
+          }
+
+          @Override
+          public Integer finish() {
+            return count;
+          }
+        };
+
+    SimpleHoodieExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = null;
+
+    try {
+      exec = new SimpleHoodieExecutor(hoodieRecords.iterator(), consumer, 
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), 
getPreExecuteRunnable());
+      int result = exec.execute();
+      // It should buffer and write 100 records

Review Comment:
   There's no buffering in here, let's update this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to