danny0405 commented on a change in pull request #3741:
URL: https://github.com/apache/hudi/pull/3741#discussion_r725797869
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -18,39 +18,277 @@
package org.apache.hudi.table.action.compact;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieCopyOnWriteTableOperation;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
/**
* A HoodieCompactor runs compaction on a hoodie table.
*/
-public interface HoodieCompactor<T extends HoodieRecordPayload, I, K, O>
extends Serializable {
+public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O>
implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieCompactor.class);
/**
- * Generate a new compaction plan for scheduling.
+ * @param config Write config.
+ * @return the reader schema for {@link HoodieMergedLogRecordScanner}.
+ */
+ public abstract Schema getReaderSchema(HoodieWriteConfig config);
+
+ /**
+ * Updates the reader schema for actual compaction operations.
*
- * @param context HoodieEngineContext
- * @param hoodieTable Hoodie Table
- * @param config Hoodie Write Configuration
- * @param compactionCommitTime scheduled compaction commit time
- * @param fgIdsInPendingCompactions partition-fileId pairs for which
compaction is pending
- * @return Compaction Plan
- * @throws IOException when encountering errors
+ * @param config Write config.
+ * @param metaClient {@link HoodieTableMetaClient} instance to use.
*/
- HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config,
- String compactionCommitTime,
Set<HoodieFileGroupId> fgIdsInPendingCompactions) throws IOException;
+ public abstract void updateReaderSchema(HoodieWriteConfig config,
HoodieTableMetaClient metaClient);
+
+ /**
+ * Handles the compaction timeline based on the compaction instant.
+ *
+ * @param table {@link HoodieTable} instance to use.
+ * @param pendingCompactionTimeline pending compaction timeline.
+ * @param compactionInstantTime compaction instant
+ * @param writeClient Write client.
+ */
+ public abstract void handleCompactionTimeline(
+ HoodieTable table, HoodieTimeline pendingCompactionTimeline,
Review comment:
`handleCompactionTimeline` -> `preCompact `?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
##########
@@ -75,164 +42,27 @@
* <p>Note: the compaction logic is invoked through the flink pipeline.
*/
@SuppressWarnings("checkstyle:LineLength")
-public class HoodieFlinkMergeOnReadTableCompactor<T extends
HoodieRecordPayload> implements HoodieCompactor<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
-
- private static final Logger LOG =
LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
-
- // Accumulator to keep track of total log files for a table
- private AtomicLong totalLogFiles;
- // Accumulator to keep track of total log file slices for a table
- private AtomicLong totalFileSlices;
+public class HoodieFlinkMergeOnReadTableCompactor<T extends
HoodieRecordPayload>
+ extends HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
@Override
- public List<WriteStatus> compact(HoodieEngineContext context,
HoodieCompactionPlan compactionPlan,
- HoodieTable hoodieTable, HoodieWriteConfig
config, String compactionInstantTime) throws IOException {
- throw new
UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not
support compact directly, "
- + "the function works as a separate pipeline");
+ public Schema getReaderSchema(HoodieWriteConfig config) {
+ return HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
Review comment:
Spark also should include the `_hoodie_operation` field if it turns
on(default turns off), so the logic is same for flink and spark, and there is
no need to abstract the `getReaderSchema` out as separate method to override.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
##########
@@ -46,6 +46,36 @@
public HoodieWriteMetadata() {
}
+ /**
+ * Clones the write metadata with transformed write statuses.
+ *
+ * @param transformedWriteStatuses transformed write statuses
+ * @param <T> type of transformed write statuses
+ * @return Cloned {@link HoodieWriteMetadata<T>} instance
+ */
+ public <T> HoodieWriteMetadata<T> clone(T transformedWriteStatuses) {
+ HoodieWriteMetadata<T> newMetadataInstance = new HoodieWriteMetadata<>();
+ newMetadataInstance.setWriteStatuses(transformedWriteStatuses);
+ if (indexLookupDuration.isPresent()) {
Review comment:
Why the spark must have the write status list as `RDD` ? Is there some
performance promotion ? I guess the write status list is small and a Java list
is enough ?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTableOperation.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface for insert and update operations in compaction.
Review comment:
`HoodieCopyOnWriteTableOperation` => `DataWriteHandler` ?
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
##########
@@ -75,164 +42,27 @@
* <p>Note: the compaction logic is invoked through the flink pipeline.
*/
@SuppressWarnings("checkstyle:LineLength")
-public class HoodieFlinkMergeOnReadTableCompactor<T extends
HoodieRecordPayload> implements HoodieCompactor<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
-
- private static final Logger LOG =
LogManager.getLogger(HoodieFlinkMergeOnReadTableCompactor.class);
-
- // Accumulator to keep track of total log files for a table
- private AtomicLong totalLogFiles;
- // Accumulator to keep track of total log file slices for a table
- private AtomicLong totalFileSlices;
+public class HoodieFlinkMergeOnReadTableCompactor<T extends
HoodieRecordPayload>
+ extends HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
@Override
- public List<WriteStatus> compact(HoodieEngineContext context,
HoodieCompactionPlan compactionPlan,
- HoodieTable hoodieTable, HoodieWriteConfig
config, String compactionInstantTime) throws IOException {
- throw new
UnsupportedOperationException("HoodieFlinkMergeOnReadTableCompactor does not
support compact directly, "
- + "the function works as a separate pipeline");
+ public Schema getReaderSchema(HoodieWriteConfig config) {
+ return HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
}
- public List<WriteStatus> compact(HoodieFlinkCopyOnWriteTable
hoodieCopyOnWriteTable,
- HoodieTableMetaClient metaClient,
- HoodieWriteConfig config,
- CompactionOperation operation,
- String instantTime) throws IOException {
- FileSystem fs = metaClient.getFs();
-
- Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
- LOG.info("Compacting base " + operation.getDataFileName() + " with delta
files " + operation.getDeltaFileNames()
- + " for commit " + instantTime);
- // TODO - FIX THIS
- // Reads the entire avro file. Always only specific blocks should be read
from the avro file
- // (failure recover).
- // Load all the delta commits since the last compaction commit and get all
the blocks to be
- // loaded and load it using CompositeAvroLogReader
- // Since a DeltaCommit is not defined yet, reading all the records.
revisit this soon.
- String maxInstantTime = metaClient
-
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
- HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
- .filterCompletedInstants().lastInstant().get().getTimestamp();
- // TODO(danny): make it configurable
- long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new
FlinkTaskContextSupplier(null), config);
- LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);
-
- List<String> logFiles = operation.getDeltaFileNames().stream().map(
- p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(),
operation.getPartitionPath()), p).toString())
- .collect(toList());
- HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
- .withFileSystem(fs)
- .withBasePath(metaClient.getBasePath())
- .withLogFilePaths(logFiles)
- .withReaderSchema(readerSchema)
- .withLatestInstantTime(maxInstantTime)
- .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
- .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
- .withReverseReader(config.getCompactionReverseLogReadEnabled())
- .withBufferSize(config.getMaxDFSStreamBufferSize())
- .withSpillableMapBasePath(config.getSpillableMapBasePath())
- .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
- .build();
- if (!scanner.iterator().hasNext()) {
- return new ArrayList<>();
- }
-
- Option<HoodieBaseFile> oldDataFileOpt =
- operation.getBaseFile(metaClient.getBasePath(),
operation.getPartitionPath());
-
- // Compacting is very similar to applying updates to existing file
- Iterator<List<WriteStatus>> result;
- // If the dataFile is present, perform updates else perform inserts into a
new base file.
- if (oldDataFileOpt.isPresent()) {
- result = hoodieCopyOnWriteTable.handleUpdate(instantTime,
operation.getPartitionPath(),
- operation.getFileId(), scanner.getRecords(),
- oldDataFileOpt.get());
- } else {
- result = hoodieCopyOnWriteTable.handleInsert(instantTime,
operation.getPartitionPath(), operation.getFileId(),
- scanner.getRecords());
- }
- Iterable<List<WriteStatus>> resultIterable = () -> result;
- return StreamSupport.stream(resultIterable.spliterator(),
false).flatMap(Collection::stream).peek(s -> {
-
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
- s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
- s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
- s.getStat().setPartitionPath(operation.getPartitionPath());
- s.getStat()
-
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
- s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
- s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
- s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
- RuntimeStats runtimeStats = new RuntimeStats();
-
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
- s.getStat().setRuntimeStats(runtimeStats);
- scanner.close();
- }).collect(toList());
+ @Override
+ public void updateReaderSchema(HoodieWriteConfig config,
HoodieTableMetaClient metaClient) {
+ // No OP
Review comment:
Flink also needs this action, so there is no need to abstract the
`updateReaderSchema` out as separate method to override.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]