This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6de9f5d [HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable.
6de9f5d is described below
commit 6de9f5d9e5cb1f82d7c32d04b114e7d4a181619b
Author: satishkotha <[email protected]>
AuthorDate: Mon Apr 27 12:50:39 2020 -0700
[HUDI-819] Fix a bug with MergeOnReadLazyInsertIterable.
Variable declared here[1] masks protected statuses variable. So although
hoodie writes data, will not include writestatus in the completed section. This
can cause duplicates being written (#1540)
[1]
https://github.com/apache/incubator-hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java#L53
---
.../hudi/execution/BulkInsertMapFunction.java | 2 +-
...InsertIterable.java => LazyInsertIterable.java} | 35 +++++-----
.../execution/MergeOnReadLazyInsertIterable.java | 74 ----------------------
.../org/apache/hudi/io/AppendHandleFactory.java | 36 +++++++++++
.../org/apache/hudi/io/CreateHandleFactory.java | 36 +++++++++++
.../org/apache/hudi/io/WriteHandleFactory.java | 35 ++++++++++
.../table/action/commit/CommitActionExecutor.java | 4 +-
.../deltacommit/DeltaCommitActionExecutor.java | 7 +-
.../execution/TestBoundedInMemoryExecutor.java | 4 +-
.../hudi/execution/TestBoundedInMemoryQueue.java | 4 +-
10 files changed, 138 insertions(+), 99 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
index 5d4391c..67c1d75 100644
---
a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
+++
b/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java
@@ -50,7 +50,7 @@ public class BulkInsertMapFunction<T extends
HoodieRecordPayload>
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> sortedRecordItr) {
- return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config,
instantTime, hoodieTable,
+ return new LazyInsertIterable<>(sortedRecordItr, config, instantTime,
hoodieTable,
fileIDPrefixes.get(partition),
hoodieTable.getSparkTaskContextSupplier());
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
similarity index 81%
rename from
hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
rename to
hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
index 8f98496..fe0d5c4 100644
---
a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteLazyInsertIterable.java
+++
b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java
@@ -28,7 +28,8 @@ import
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
@@ -43,26 +44,34 @@ import java.util.function.Function;
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the
partitionPath, into new files.
*/
-public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload>
+public class LazyInsertIterable<T extends HoodieRecordPayload>
extends LazyIterableIterator<HoodieRecord<T>, List<WriteStatus>> {
protected final HoodieWriteConfig hoodieConfig;
protected final String instantTime;
protected final HoodieTable<T> hoodieTable;
protected final String idPrefix;
- protected int numFilesWritten;
protected SparkTaskContextSupplier sparkTaskContextSupplier;
+ protected WriteHandleFactory<T> writeHandleFactory;
- public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>>
sortedRecordItr, HoodieWriteConfig config,
- String instantTime, HoodieTable<T>
hoodieTable, String idPrefix,
- SparkTaskContextSupplier
sparkTaskContextSupplier) {
+ public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr,
HoodieWriteConfig config,
+ String instantTime, HoodieTable<T> hoodieTable,
String idPrefix,
+ SparkTaskContextSupplier sparkTaskContextSupplier)
{
+ this(sortedRecordItr, config, instantTime, hoodieTable, idPrefix,
sparkTaskContextSupplier,
+ new CreateHandleFactory<>());
+ }
+
+ public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr,
HoodieWriteConfig config,
+ String instantTime, HoodieTable<T> hoodieTable,
String idPrefix,
+ SparkTaskContextSupplier sparkTaskContextSupplier,
+ WriteHandleFactory<T> writeHandleFactory) {
super(sortedRecordItr);
this.hoodieConfig = config;
this.instantTime = instantTime;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
- this.numFilesWritten = 0;
this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+ this.writeHandleFactory = writeHandleFactory;
}
// Used for caching HoodieRecord along with insertValue. We need this to
offload computation work to buffering thread.
@@ -118,10 +127,6 @@ public class CopyOnWriteLazyInsertIterable<T extends
HoodieRecordPayload>
@Override
protected void end() {}
- protected String getNextFileId(String idPfx) {
- return String.format("%s-%d", idPfx, numFilesWritten++);
- }
-
protected CopyOnWriteInsertHandler getInsertHandler() {
return new CopyOnWriteInsertHandler();
}
@@ -140,8 +145,8 @@ public class CopyOnWriteLazyInsertIterable<T extends
HoodieRecordPayload>
final HoodieRecord insertPayload = payload.record;
// lazily initialize the handle, for the first time
if (handle == null) {
- handle = new HoodieCreateHandle(hoodieConfig, instantTime,
hoodieTable, insertPayload.getPartitionPath(),
- getNextFileId(idPrefix), sparkTaskContextSupplier);
+ handle = writeHandleFactory.create(hoodieConfig, instantTime,
hoodieTable, insertPayload.getPartitionPath(),
+ idPrefix, sparkTaskContextSupplier);
}
if (handle.canWrite(payload.record)) {
@@ -151,8 +156,8 @@ public class CopyOnWriteLazyInsertIterable<T extends
HoodieRecordPayload>
// handle is full.
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
- handle = new HoodieCreateHandle(hoodieConfig, instantTime,
hoodieTable, insertPayload.getPartitionPath(),
- getNextFileId(idPrefix), sparkTaskContextSupplier);
+ handle = writeHandleFactory.create(hoodieConfig, instantTime,
hoodieTable, insertPayload.getPartitionPath(),
+ idPrefix, sparkTaskContextSupplier);
handle.write(insertPayload, payload.insertValue, payload.exception);
// we should be able to write 1 payload.
}
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
b/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
deleted file mode 100644
index 02a9ead..0000000
---
a/hudi-client/src/main/java/org/apache/hudi/execution/MergeOnReadLazyInsertIterable.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.execution;
-
-import org.apache.hudi.client.SparkTaskContextSupplier;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.io.HoodieAppendHandle;
-import org.apache.hudi.table.HoodieTable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Lazy Iterable, that writes a stream of HoodieRecords sorted by the
partitionPath, into new log files.
- */
-public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload>
extends CopyOnWriteLazyInsertIterable<T> {
-
- public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>>
sortedRecordItr, HoodieWriteConfig config,
- String instantTime, HoodieTable<T> hoodieTable, String idPfx,
SparkTaskContextSupplier sparkTaskContextSupplier) {
- super(sortedRecordItr, config, instantTime, hoodieTable, idPfx,
sparkTaskContextSupplier);
- }
-
- @Override
- protected CopyOnWriteInsertHandler getInsertHandler() {
- return new MergeOnReadInsertHandler();
- }
-
- protected class MergeOnReadInsertHandler extends CopyOnWriteInsertHandler {
-
- @Override
- protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord>
payload) {
- final HoodieRecord insertPayload = payload.record;
- List<WriteStatus> statuses = new ArrayList<>();
- // lazily initialize the handle, for the first time
- if (handle == null) {
- handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), getNextFileId(idPrefix),
sparkTaskContextSupplier);
- }
- if (handle.canWrite(insertPayload)) {
- // write the payload, if the handle has capacity
- handle.write(insertPayload, payload.insertValue, payload.exception);
- } else {
- // handle is full.
- handle.close();
- statuses.add(handle.getWriteStatus());
- // Need to handle the rejected payload & open new handle
- handle = new HoodieAppendHandle(hoodieConfig, instantTime, hoodieTable,
- insertPayload.getPartitionPath(), getNextFileId(idPrefix),
sparkTaskContextSupplier);
- handle.write(insertPayload, payload.insertValue, payload.exception);
// we should be able to write 1 payload.
- }
- }
- }
-
-}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
b/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
new file mode 100644
index 0000000..4a5554b
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/AppendHandleFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public class AppendHandleFactory<T extends HoodieRecordPayload> extends
WriteHandleFactory<T> {
+
+ @Override
+ public HoodieAppendHandle<T> create(final HoodieWriteConfig hoodieConfig,
final String commitTime,
+ final HoodieTable<T> hoodieTable, final
String partitionPath,
+ final String fileIdPrefix, final
SparkTaskContextSupplier sparkTaskContextSupplier) {
+
+ return new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable,
partitionPath,
+ getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
b/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
new file mode 100644
index 0000000..68d8b4d
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/CreateHandleFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public class CreateHandleFactory<T extends HoodieRecordPayload> extends
WriteHandleFactory<T> {
+
+ @Override
+ public HoodieWriteHandle<T> create(final HoodieWriteConfig hoodieConfig,
final String commitTime,
+ final HoodieTable<T> hoodieTable, final
String partitionPath,
+ final String fileIdPrefix,
SparkTaskContextSupplier sparkTaskContextSupplier) {
+
+ return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable,
partitionPath,
+ getNextFileId(fileIdPrefix), sparkTaskContextSupplier);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
b/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
new file mode 100644
index 0000000..7039b71
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+public abstract class WriteHandleFactory<T extends HoodieRecordPayload> {
+ private int numFilesWritten = 0;
+
+ public abstract HoodieWriteHandle<T> create(HoodieWriteConfig config, String
commitTime, HoodieTable<T> hoodieTable,
+ String partitionPath, String fileIdPrefix, SparkTaskContextSupplier
sparkTaskContextSupplier);
+
+ protected String getNextFileId(String idPfx) {
+ return String.format("%s-%d", idPfx, numFilesWritten++);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
index b958837..5208c12 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
@@ -29,7 +29,7 @@ import
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.CopyOnWriteLazyInsertIterable;
+import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieTable;
@@ -132,7 +132,7 @@ public abstract class CommitActionExecutor<T extends
HoodieRecordPayload<T>>
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>)
Collections.EMPTY_LIST).iterator();
}
- return new CopyOnWriteLazyInsertIterable<>(recordItr, config, instantTime,
(HoodieTable<T>)table, idPfx,
+ return new LazyInsertIterable<>(recordItr, config, instantTime,
(HoodieTable<T>)table, idPfx,
sparkTaskContextSupplier);
}
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
index 775580e..be3806e 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java
@@ -24,8 +24,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.execution.MergeOnReadLazyInsertIterable;
+import org.apache.hudi.execution.LazyInsertIterable;
import org.apache.hudi.io.HoodieAppendHandle;
+import org.apache.hudi.io.AppendHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
@@ -84,8 +85,8 @@ public abstract class DeltaCommitActionExecutor<T extends
HoodieRecordPayload<T>
throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to
parquet files
if (table.getIndex().canIndexLogFiles()) {
- return new MergeOnReadLazyInsertIterable<>(recordItr, config,
instantTime, (HoodieTable<T>)table, idPfx,
- sparkTaskContextSupplier);
+ return new LazyInsertIterable<>(recordItr, config, instantTime,
(HoodieTable<T>)table, idPfx,
+ sparkTaskContextSupplier, new AppendHandleFactory<>());
} else {
return super.handleInsert(idPfx, recordItr);
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
index 58f898c..1db248f 100644
---
a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
+++
b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutor.java
@@ -25,7 +25,7 @@ import
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
-import
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.avro.generic.IndexedRecord;
import org.junit.After;
@@ -37,7 +37,7 @@ import java.util.List;
import scala.Tuple2;
-import static
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction;
+import static
org.apache.hudi.execution.LazyInsertIterable.getTransformFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git
a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index 72b3eff..859cf4e 100644
---
a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++
b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -31,7 +31,7 @@ import
org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
-import
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.avro.generic.IndexedRecord;
import org.junit.After;
@@ -53,7 +53,7 @@ import java.util.stream.IntStream;
import scala.Tuple2;
-import static
org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.getTransformFunction;
+import static
org.apache.hudi.execution.LazyInsertIterable.getTransformFunction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;