yihua commented on a change in pull request #3741:
URL: https://github.com/apache/hudi/pull/3741#discussion_r734198400
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -366,12 +367,13 @@ public HoodieActiveTimeline getActiveTimeline() {
/**
* Run Compaction on the table. Compaction arranges the data so that it is
optimized for data access.
*
- * @param context HoodieEngineContext
+ * @param context HoodieEngineContext
* @param compactionInstantTime Instant Time
+ * @param writeClient Write client
*/
public abstract HoodieWriteMetadata<O> compact(HoodieEngineContext context,
- String compactionInstantTime);
-
+ String compactionInstantTime,
+ AbstractHoodieWriteClient
writeClient);
Review comment:
I'm able to get rid of the write client here by moving the rollback
logic to `HoodieTable`. Fixed.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieDataCompactionHandler.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface for insert and update operations in compaction.
+ *
+ * @param <T> HoodieRecordPayload type.
+ */
+public interface HoodieDataCompactionHandler<T extends HoodieRecordPayload> {
Review comment:
Renamed.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -18,39 +18,276 @@
package org.apache.hudi.table.action.compact;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieDataCompactionHandler;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
/**
* A HoodieCompactor runs compaction on a hoodie table.
*/
-public interface HoodieCompactor<T extends HoodieRecordPayload, I, K, O>
extends Serializable {
+public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O>
implements Serializable {
Review comment:
Yes. I created two JIRA tickets here for code clean-up:
https://issues.apache.org/jira/browse/HUDI-2596
https://issues.apache.org/jira/browse/HUDI-2597
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
##########
@@ -62,6 +67,21 @@ public HoodieJavaEngineContext(Configuration conf,
TaskContextSupplier taskConte
super(new SerializableConfiguration(conf), taskContextSupplier);
}
+ @Override
+ public HoodieAccumulator createNewAccumulator() {
Review comment:
Renamed.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieDataCompactionHandler.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface for insert and update operations in compaction.
+ *
+ * @param <T> HoodieRecordPayload type.
+ */
+public interface HoodieDataCompactionHandler<T extends HoodieRecordPayload> {
Review comment:
JIRA ticket created here to track the record payload class redesigning:
https://issues.apache.org/jira/browse/HUDI-2598
##########
File path:
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
##########
@@ -97,15 +98,20 @@
HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata) {
- BaseScheduleCompactionActionExecutor scheduleCompactionExecutor = new
FlinkScheduleCompactionActionExecutor(
- context, config, this, instantTime, extraMetadata);
+ ScheduleCompactionActionExecutor scheduleCompactionExecutor = new
ScheduleCompactionActionExecutor(
+ context, config, this, instantTime, extraMetadata,
+ new HoodieFlinkMergeOnReadTableCompactor());
return scheduleCompactionExecutor.execute();
}
@Override
- public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext
context, String compactionInstantTime) {
- throw new HoodieNotSupportedException("Compaction is supported as a
separate pipeline, "
- + "should not invoke directly through HoodieFlinkMergeOnReadTable");
+ public HoodieWriteMetadata<List<WriteStatus>> compact(
+ HoodieEngineContext context, String compactionInstantTime,
AbstractHoodieWriteClient writeClient) {
+ RunCompactionActionExecutor compactionExecutor = new
RunCompactionActionExecutor(
Review comment:
No, regarding the refactoring, I just moved the compaction code from
Flink write client to Flink table class.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -18,39 +18,276 @@
package org.apache.hudi.table.action.compact;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieDataCompactionHandler;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
/**
* A HoodieCompactor runs compaction on a hoodie table.
*/
-public interface HoodieCompactor<T extends HoodieRecordPayload, I, K, O>
extends Serializable {
+public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O>
implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieCompactor.class);
/**
- * Generate a new compaction plan for scheduling.
+ * Handles the compaction timeline based on the compaction instant before
actual compaction.
*
- * @param context HoodieEngineContext
- * @param hoodieTable Hoodie Table
- * @param config Hoodie Write Configuration
- * @param compactionCommitTime scheduled compaction commit time
- * @param fgIdsInPendingCompactions partition-fileId pairs for which
compaction is pending
- * @return Compaction Plan
- * @throws IOException when encountering errors
+ * @param table {@link HoodieTable} instance to use.
+ * @param pendingCompactionTimeline pending compaction timeline.
+ * @param compactionInstantTime compaction instant
+ * @param writeClient Write client.
*/
- HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context,
HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config,
- String compactionCommitTime,
Set<HoodieFileGroupId> fgIdsInPendingCompactions) throws IOException;
+ public abstract void preCompact(
+ HoodieTable table, HoodieTimeline pendingCompactionTimeline,
+ String compactionInstantTime, AbstractHoodieWriteClient writeClient);
/**
* Execute compaction operations and report back status.
*/
- O compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
HoodieTable<T, I, K, O> hoodieTable,
- HoodieWriteConfig config, String compactionInstantTime) throws
IOException;
+ public HoodieData<WriteStatus> compact(
+ HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
+ HoodieTable table, HoodieWriteConfig config, String
compactionInstantTime,
+ HoodieDataCompactionHandler copyOnWriteTableOperation) {
Review comment:
Renamed to `compactionHandler` based on the class name.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDDData.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.utils.SparkMemoryUtils;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.function.SerializableFunction;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Holds a {@link JavaRDD} of objects.
+ *
+ * @param <T> type of object.
+ */
+public class HoodieJavaRDDData<T> extends HoodieData<T> {
Review comment:
Sg. Renamed.
##########
File path:
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
##########
@@ -62,6 +67,21 @@ public HoodieJavaEngineContext(Configuration conf,
TaskContextSupplier taskConte
super(new SerializableConfiguration(conf), taskContextSupplier);
}
+ @Override
+ public HoodieAccumulator createNewAccumulator() {
+ return HoodieAtomicLongAccumulator.create();
+ }
+
+ @Override
+ public <T> HoodieData<T> createEmptyHoodieData() {
Review comment:
Renamed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]