xushiyan commented on code in PR #4913:
URL: https://github.com/apache/hudi/pull/4913#discussion_r1205548327
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java:
##########
@@ -60,17 +63,28 @@ public FlinkAppendHandle(
Iterator<HoodieRecord<T>> recordItr,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr,
taskContextSupplier);
- this.writeMarkers = WriteMarkersFactory.get(config.getMarkersType(),
hoodieTable, instantTime);
}
- @Override
- protected void createMarkerFile(String partitionPath, String dataFileName) {
- // In some rare cases, the task was pulled up again with same write file
name,
- // for e.g, reuse the small log files from last commit instant.
-
- // Just skip the marker creation if it already exists, the new data would
append to
- // the file directly.
- writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType());
+ protected HoodieLogFileWriteCallback getLogWriteCallback() {
+ return new DefaultHoodieLogFileWriteCallBack() {
Review Comment:
instead of using anonymous class, why not use
`org.apache.hudi.io.HoodieWriteHandle.AppendLogWriteCallback` here?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java:
##########
@@ -87,18 +88,58 @@ public void tearDown() throws Exception {
@Test
public void testMarkerBasedRollbackAppend() throws Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
- .withMarkerFile("partA", f0, IOType.APPEND);
+ .withLogMarkerFile("partA", f0, IOType.CREATE);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
"002").getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
"001"));
assertEquals(1, rollbackRequests.size());
}
+ @Test
+ public void testMarkerBasedRollbackAppendWithLogFileMarkers() throws
Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
+ // test append IO TYPE
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ String f0 = testTable.addRequestedCommit("000")
+ .getFileIdWithLogFile("partA");
+ testTable.forCommit("001")
+ .withLogMarkerFile("partA", f0, IOType.APPEND);
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
+ List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
+ "002").getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
"001"));
+ assertEquals(1, rollbackRequests.size());
+ HoodieRollbackRequest rollbackRequest = rollbackRequests.get(0);
+ assertEquals("partA", rollbackRequest.getPartitionPath());
+ assertEquals(f0, rollbackRequest.getFileId());
+ assertEquals(0, rollbackRequest.getFilesToBeDeleted().size());
+ assertEquals(1, rollbackRequest.getLogBlocksToBeDeleted().size());
+
+ // test create IO TYPE
Review Comment:
this can be parameterized test over IO Type
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -151,6 +154,7 @@ public HoodieAppendHandle(HoodieWriteConfig config, String
instantTime, HoodieTa
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
+ this.writeMarkers = WriteMarkersFactory.get(config.getMarkersType(),
hoodieTable, instantTime);
Review Comment:
this not being used elsewhere?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+
+/**
+ * HoodieLogFileWriteCallback is trigger when specific log file operation
happen
+ */
+public interface HoodieLogFileWriteCallback {
Review Comment:
out of scope for this pr: this is a good pattern to apply to base file as
well?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -752,6 +755,56 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>>
twoUpsertCommitDataWithTwoP
return Pair.of(records, records2);
}
+ /**
+ * Since how markers are generated for log file changed in Version Six, we
regenerate markers in the way version zero do.
+ *
+ * @param table instance of {@link HoodieTable}
+ */
+ private void prepForUpgradeFromZeroToOne(HoodieTable table) throws
IOException {
+ List<HoodieInstant> instantsToBeParsed =
+ metaClient.getActiveTimeline()
+ .getCommitsTimeline()
+ .getInstantsAsStream()
+ .collect(Collectors.toList());
+ for (HoodieInstant instant : instantsToBeParsed) {
+ WriteMarkers writeMarkers =
+ WriteMarkersFactory.get(table.getConfig().getMarkersType(), table,
instant.getTimestamp());
+ Set<String> oldMarkers = writeMarkers.allMarkerFilePaths();
+ boolean hasAppendMarker = oldMarkers.stream().anyMatch(marker ->
marker.contains(IOType.APPEND.name())
+ || marker.contains(IOType.CREATE.name()));
+ if (hasAppendMarker) {
+ // delete all markers and regenerate
+ writeMarkers.deleteMarkerDir(table.getContext(), 2);
+ for (String oldMarker : oldMarkers) {
+ String typeStr = oldMarker.substring(oldMarker.lastIndexOf(".") + 1);
+ IOType type = IOType.valueOf(typeStr);
+ String partitionFilePath = WriteMarkers.stripMarkerSuffix(oldMarker);
+ Path fullFilePath = new Path(basePath, partitionFilePath);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePath.getParent());
+ if (FSUtils.isBaseFile(fullFilePath)) {
+ writeMarkers.create(partitionPath, fullFilePath.getName(), type);
+ } else {
+ String fileId = FSUtils.getFileIdFromFilePath(fullFilePath);
+ String baseInstant =
FSUtils.getBaseCommitTimeFromLogPath(fullFilePath);
+ String writeToken = FSUtils.getWriteTokenFromLogPath(fullFilePath);
+ writeMarkers.create(partitionPath,
+ FSUtils.makeBaseFileName(baseInstant, writeToken, fileId,
table.getBaseFileFormat().getFileExtension()), type);
+ }
+ }
+ writeMarkers.allMarkerFilePaths()
+ .forEach(markerPath ->
assertFalse(markerPath.contains(HoodieLogFile.DELTA_EXTENSION)));
+ }
+ }
+ }
+
+ private void prepForDowngradeFromOneToZero() throws IOException {
Review Comment:
not used?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -273,4 +281,32 @@ protected static Option<IndexedRecord>
toAvroRecord(HoodieRecord record, Schema
return Option.empty();
}
}
+
+ protected class AppendLogWriteCallback extends
DefaultHoodieLogFileWriteCallBack {
+ // here we distinguish log files created from log files being appended.
Considering following scenario:
+ // An appending task write to log file.
+ // (1) append to existing file file_instant_writetoken1.log.1
+ // (2) rollover and create file file_instant_writetoken2.log.2
+ // Then this task failed and retry by a new task.
+ // (3) append to existing file file_instant_writetoken1.log.1
+ // (4) rollover and create file file_instant_writetoken3.log.2
+ // finally file_instant_writetoken2.log.2 should not be committed to hudi,
we use marker file to delete it.
+ // keep in mind that log file is not always fail-safe unless it never roll
over
+
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ // here we use createIfNotExists because in some rare cases, the task
was pulled up again with same write file name,
+ // for e.g, reuse the small log files from last commit instant in flink.
Task retry in spark.
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ return writeMarkers.create(partitionPath, logFileToAppend.getFileName(),
IOType.APPEND,
Review Comment:
should this be `createIfNotExist()` ?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java:
##########
@@ -87,18 +88,58 @@ public void tearDown() throws Exception {
@Test
public void testMarkerBasedRollbackAppend() throws Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
- .withMarkerFile("partA", f0, IOType.APPEND);
+ .withLogMarkerFile("partA", f0, IOType.CREATE);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
"002").getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
"001"));
assertEquals(1, rollbackRequests.size());
}
+ @Test
+ public void testMarkerBasedRollbackAppendWithLogFileMarkers() throws
Exception {
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
Review Comment:
looks a bit hacky. can the test class be properly teardown() in AfterEach()?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/DefaultHoodieLogFileWriteCallBack.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+
+/**
+ * A default implementation of {@link HoodieLogFileWriteCallback}
+ */
+public class DefaultHoodieLogFileWriteCallBack implements
HoodieLogFileWriteCallback {
Review Comment:
can just put `default` impl in the interface itself? so we don't need this
class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]