garyli1019 commented on a change in pull request #2553:
URL: https://github.com/apache/hudi/pull/2553#discussion_r571983074
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -111,7 +124,23 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT);
- HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context,
instantTime, records);
+ // create the write handle if not exists
Review comment:
maybe move this comment to where it was created?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -111,7 +124,23 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT);
- HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context,
instantTime, records);
+ // create the write handle if not exists
+ final HoodieRecord<T> record = records.get(0);
+ final HoodieRecordLocation loc = record.getCurrentLocation();
+ final String fileID = loc.getFileId();
+ final boolean isInsert = loc.getInstantTime().equals("I");
+ final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
Review comment:
is this final?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports create write
incrementally(mini-batches).
+ *
+ * <p>For the first mini-batch, it initialize and set up the next file path to
write,
+ * but does not close the file writer until all the mini-batches write finish.
Each mini-batch
+ * data are appended to the same file.
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
+ extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
+
+ private static final Logger LOG =
LogManager.getLogger(FlinkCreateHandle.class);
+ private long lastFileSize = 0L;
+
+ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
+ this(config, instantTime, hoodieTable, partitionPath, fileId,
getWriterSchemaIncludingAndExcludingMetadataPair(config),
+ taskContextSupplier);
+ }
+
+ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ String partitionPath, String fileId, Pair<Schema,
Schema> writerSchemaIncludingAndExcludingMetadataPair,
+ TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, partitionPath, fileId,
writerSchemaIncludingAndExcludingMetadataPair,
+ taskContextSupplier);
+ }
+
+ /**
+ * Called by the compactor code path.
+ */
+ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ String partitionPath, String fileId, Map<String,
HoodieRecord<T>> recordMap,
+ TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap,
taskContextSupplier);
+ }
+
+ /**
+ * Get the incremental write status. In mini-batch write mode,
+ * this handle would be reused for a checkpoint bucket(the bucket is
appended as mini-batches),
+ * thus, after a mini-batch append finish, we do not close the underneath
writer but return
+ * the incremental WriteStatus instead.
+ *
+ * @return the incremental write status
+ */
+ private WriteStatus incWriteStatus() {
Review comment:
Not quite sure how would this work, is this a part of the RFC doc?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports merge write
incrementally(mini-batches).
+ *
+ * <p>For a new mini-batch, it initialize and set up the next file path to
write,
+ * and closes the file path when the mini-batch write finish. When next
mini-batch
+ * write starts, it rolls over to another new file. If all the mini-batches
write finish
+ * for a checkpoint round, it renames the last new file path as the desired
file name
+ * (name with the expected file ID).
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
+ extends HoodieMergeHandle<T, I, K, O>
+ implements MiniBatchHandle {
+
+ private static final Logger LOG =
LogManager.getLogger(FlinkMergeHandle.class);
+
+ /**
+ * Records the current file handles number that rolls over.
+ */
+ private int rollNumber = 0;
+ /**
+ * Records the rolled over file paths.
+ */
+ private List<Path> rolloverPaths;
+ /**
+ * Whether it is the first time to generate file handle, E.G. the handle has
not rolled over yet.
+ */
+ private boolean needBootStrap = true;
+
+ public FlinkMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr, String
partitionPath, String fileId,
+ TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId,
taskContextSupplier);
+ rolloverPaths = new ArrayList<>();
+ }
+
+ /**
+ * Called by compactor code path.
+ */
+ public FlinkMergeHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ Map<String, HoodieRecord<T>> keyToNewRecords, String
partitionPath, String fileId,
+ HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath,
fileId,
+ dataFileToBeMerged, taskContextSupplier);
+ }
+
+ /**
+ * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
+ */
+ protected String dataFileName() {
+ return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" +
rollNumber, hoodieTable.getBaseFileExtension());
+ }
+
+ public boolean isNeedBootStrap() {
+ return needBootStrap;
+ }
+
+ @Override
+ public List<WriteStatus> close() {
+ List<WriteStatus> writeStatus = super.close();
+ this.needBootStrap = false;
+ return writeStatus;
+ }
+
+ /**
+ * THe difference with the parent method is that there is no need to set up
+ * locations for the records.
+ *
+ * @param fileId The file ID
+ * @param newRecordsItr The incremental records iterator
+ */
+ @Override
+ protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
+ try {
+ // Load the new records in a map
+ long memoryForMerge =
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
+ LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
+ this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge,
config.getSpillableMapBasePath(),
+ new DefaultSizeEstimator(), new
HoodieRecordSizeEstimator(writerSchema));
+ } catch (IOException io) {
+ throw new HoodieIOException("Cannot instantiate an
ExternalSpillableMap", io);
+ }
+ while (newRecordsItr.hasNext()) {
+ HoodieRecord<T> record = newRecordsItr.next();
+ // NOTE: Once Records are added to map (spillable-map), DO NOT change it
as they won't persist
+ keyToNewRecords.put(record.getRecordKey(), record);
+ }
+ LOG.info("Number of entries in MemoryBasedMap => "
+ + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
+ + "Total size in bytes of MemoryBasedMap => "
+ + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize()
+ "Number of entries in DiskBasedMap => "
+ + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries()
+ "Size of file spilled to disk => "
+ + ((ExternalSpillableMap)
keyToNewRecords).getSizeOfFileOnDiskInBytes());
+ }
+
+ /**
+ *
+ * Rolls over the write handle to prepare for the next batch write.
+ *
+ * <p>It tweaks the handle state as following:
+ *
+ * <ul>
+ * <li>Increment the {@code rollNumber}</li>
+ * <li>Book-keep the last file path, these files (except the last one) are
temporary that need to be cleaned</li>
+ * <li>Make the last new file path as old</li>
+ * <li>Initialize the new file path and file writer</li>
+ * </ul>
+ *
+ * @param newRecordsItr The records iterator to update
+ */
+ public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) {
+ init(this.fileId, newRecordsItr);
+ this.recordsWritten = 0;
+ this.recordsDeleted = 0;
+ this.updatedRecordsWritten = 0;
+ this.insertRecordsWritten = 0;
+ this.writeStatus.setTotalErrorRecords(0);
+ this.timer = new HoodieTimer().startTimer();
+
+ rollNumber++;
+
+ rolloverPaths.add(newFilePath);
+ oldFilePath = newFilePath;
+ // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch
write.
+ String newFileName = dataFileName();
+ String relativePath = new Path((partitionPath.isEmpty() ? "" :
partitionPath + "/")
+ + newFileName).toString();
+ newFilePath = new Path(config.getBasePath(), relativePath);
+
+ try {
+ fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable,
config, writerSchemaWithMetafields, taskContextSupplier);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error when creating file writer for path "
+ newFilePath, e);
+ }
+
+ LOG.info(String.format("Merging new data into oldPath %s, as newPath %s",
oldFilePath.toString(),
+ newFilePath.toString()));
+ }
+
+ public void finishWrite() {
+ for (int i = 0; i < rolloverPaths.size() - 1; i++) {
+ Path path = rolloverPaths.get(i);
+ try {
+ fs.delete(path, false);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error when clean the temporary roll file:
" + path, e);
+ }
+ }
+ Path lastPath = rolloverPaths.size() > 0
+ ? rolloverPaths.get(rolloverPaths.size() - 1)
+ : newFilePath;
+ String newFileName = FSUtils.makeDataFileName(instantTime, writeToken,
fileId, hoodieTable.getBaseFileExtension());
+ String relativePath = new Path((partitionPath.isEmpty() ? "" :
partitionPath + "/")
+ + newFileName).toString();
+ final Path desiredPath = new Path(config.getBasePath(), relativePath);
+ try {
+ fs.rename(lastPath, desiredPath);
Review comment:
rename is not friendly for cloud object store, can we avoid rename?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports create write
incrementally(mini-batches).
+ *
+ * <p>For the first mini-batch, it initialize and set up the next file path to
write,
+ * but does not close the file writer until all the mini-batches write finish.
Each mini-batch
+ * data are appended to the same file.
+ *
+ * @param <T> Payload type
+ * @param <I> Input type
+ * @param <K> Key type
+ * @param <O> Output type
+ */
+public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
+ extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
+
+ private static final Logger LOG =
LogManager.getLogger(FlinkCreateHandle.class);
+ private long lastFileSize = 0L;
+
+ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
+ this(config, instantTime, hoodieTable, partitionPath, fileId,
getWriterSchemaIncludingAndExcludingMetadataPair(config),
+ taskContextSupplier);
+ }
+
+ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ String partitionPath, String fileId, Pair<Schema,
Schema> writerSchemaIncludingAndExcludingMetadataPair,
+ TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, partitionPath, fileId,
writerSchemaIncludingAndExcludingMetadataPair,
+ taskContextSupplier);
+ }
+
+ /**
+ * Called by the compactor code path.
+ */
+ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
+ String partitionPath, String fileId, Map<String,
HoodieRecord<T>> recordMap,
+ TaskContextSupplier taskContextSupplier) {
+ super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap,
taskContextSupplier);
+ }
+
+ /**
+ * Get the incremental write status. In mini-batch write mode,
+ * this handle would be reused for a checkpoint bucket(the bucket is
appended as mini-batches),
Review comment:
is this `checkpoint bucket` representing `BucketWriter`?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java
##########
@@ -58,33 +61,39 @@
* <p><h2>Work Flow</h2>
*
* <p>The function firstly buffers the data as a batch of {@link
HoodieRecord}s,
- * It flushes(write) the records batch when a Flink checkpoint starts. After a
batch has been written successfully,
+ * It flushes(write) the records batch when a batch exceeds the configured
size {@link FlinkOptions#WRITE_BATCH_SIZE}
+ * or a Flink checkpoint starts. After a batch has been written successfully,
* the function notifies its operator coordinator {@link
StreamWriteOperatorCoordinator} to mark a successful write.
*
- * <p><h2>Exactly-once Semantics</h2>
+ * <p><h2>The Semantics</h2>
*
* <p>The task implements exactly-once semantics by buffering the data between
checkpoints. The operator coordinator
Review comment:
no exactly-once anymore?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieMergeHandle} that supports merge write
incrementally(mini-batches).
+ *
+ * <p>For a new mini-batch, it initialize and set up the next file path to
write,
+ * and closes the file path when the mini-batch write finish. When next
mini-batch
+ * write starts, it rolls over to another new file. If all the mini-batches
write finish
+ * for a checkpoint round, it renames the last new file path as the desired
file name
+ * (name with the expected file ID).
+ *
Review comment:
Sorry, I am a bit confused by mini-batch, checkpoint, and fileid e.t.c.
We did a shuffle by file id before the `bucketWriter`, so each
`bucketWriter` should handle one file id in each commit right? Do you mean we
will write multiple files for one commit(checkpoint) in a single `bucketWriter`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]