guanziyue commented on code in PR #4913:
URL: https://github.com/apache/hudi/pull/4913#discussion_r1205732160
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java:
##########
@@ -60,17 +63,28 @@ public FlinkAppendHandle(
Iterator<HoodieRecord<T>> recordItr,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr,
taskContextSupplier);
- this.writeMarkers = WriteMarkersFactory.get(config.getMarkersType(),
hoodieTable, instantTime);
}
- @Override
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- // In some rare cases, the task was pulled up again with same write file
name,
- // for e.g, reuse the small log files from last commit instant.
-
- // Just skip the marker creation if it already exists, the new data would
append to
- // the file directly.
- writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType());
+ protected HoodieLogFileWriteCallback getLogWriteCallback() {
+ return new DefaultHoodieLogFileWriteCallBack() {
Review Comment:
This implementation has a slight difference with AppendLogWriteCallback in
method preLogFileOpen which always return true in Flink. I can inherit
AppendLogWriteCallback and override this method. Is this more preferable?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+
+/**
+ * HoodieLogFileWriteCallback is trigger when specific log file operation
happen
+ */
+public interface HoodieLogFileWriteCallback {
Review Comment:
I agree with this idea because in my opinion "marker file" is a mechanism to
show hudi's action on files and it is better if we can bind marker operations
to the method call to FileSystem so that all file creation as under hudi's
control. The concern of this idea is that such binding may block potential
interactions between marker file and non-filesystem mechanism.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java:
##########
@@ -87,18 +88,58 @@ public void tearDown() throws Exception {
@Test
public void testMarkerBasedRollbackAppend() throws Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
- .withMarkerFile("partA", f0, IOType.APPEND);
+ .withLogMarkerFile("partA", f0, IOType.CREATE);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
"002").getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
"001"));
assertEquals(1, rollbackRequests.size());
}
+ @Test
+ public void testMarkerBasedRollbackAppendWithLogFileMarkers() throws
Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
Review Comment:
Have a try and it can be teardown() correctly. Do agree with you. How about
separate all tests on MOR table in this class to a new test class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]