vinothchandar commented on a change in pull request #3741:
URL: https://github.com/apache/hudi/pull/3741#discussion_r732262882



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieDataCompactionHandler.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface for insert and update operations in compaction.
+ *
+ * @param <T> HoodieRecordPayload type.
+ */
+public interface HoodieDataCompactionHandler<T extends HoodieRecordPayload> {

Review comment:
       Just `HoodieCompactionHandler`?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDDData.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.utils.SparkMemoryUtils;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.function.SerializableFunction;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Holds a {@link JavaRDD} of objects.
+ *
+ * @param <T> type of object.
+ */
+public class HoodieJavaRDDData<T> extends HoodieData<T> {

Review comment:
       I think its okay to just use `HoodieJavaRDD` , drop the `Data` at the 
end. More readable IMO

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -18,39 +18,276 @@
 
 package org.apache.hudi.table.action.compact;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieDataCompactionHandler;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
 
 /**
  * A HoodieCompactor runs compaction on a hoodie table.
  */
-public interface HoodieCompactor<T extends HoodieRecordPayload, I, K, O> 
extends Serializable {
+public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> 
implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieCompactor.class);
 
   /**
-   * Generate a new compaction plan for scheduling.
+   * Handles the compaction timeline based on the compaction instant before 
actual compaction.
    *
-   * @param context HoodieEngineContext
-   * @param hoodieTable Hoodie Table
-   * @param config Hoodie Write Configuration
-   * @param compactionCommitTime scheduled compaction commit time
-   * @param fgIdsInPendingCompactions partition-fileId pairs for which 
compaction is pending
-   * @return Compaction Plan
-   * @throws IOException when encountering errors
+   * @param table                     {@link HoodieTable} instance to use.
+   * @param pendingCompactionTimeline pending compaction timeline.
+   * @param compactionInstantTime     compaction instant
+   * @param writeClient               Write client.
    */
-  HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, 
HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config,
-                                              String compactionCommitTime, 
Set<HoodieFileGroupId> fgIdsInPendingCompactions) throws IOException;
+  public abstract void preCompact(
+      HoodieTable table, HoodieTimeline pendingCompactionTimeline,
+      String compactionInstantTime, AbstractHoodieWriteClient writeClient);
 
   /**
    * Execute compaction operations and report back status.
    */
-  O compact(HoodieEngineContext context, HoodieCompactionPlan compactionPlan, 
HoodieTable<T, I, K, O> hoodieTable,
-      HoodieWriteConfig config, String compactionInstantTime) throws 
IOException;
+  public HoodieData<WriteStatus> compact(
+      HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
+      HoodieTable table, HoodieWriteConfig config, String 
compactionInstantTime,
+      HoodieDataCompactionHandler copyOnWriteTableOperation) {

Review comment:
       rename: copyOnWriteHandler?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
##########
@@ -18,39 +18,276 @@
 
 package org.apache.hudi.table.action.compact;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieDataCompactionHandler;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
 
 /**
  * A HoodieCompactor runs compaction on a hoodie table.
  */
-public interface HoodieCompactor<T extends HoodieRecordPayload, I, K, O> 
extends Serializable {
+public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> 
implements Serializable {

Review comment:
       unrelated comment; in our code base, we should probably make it 
consistent how we name interfaces and abstract classes. Some places its 
`Hoodiexxx`, some its `Abstractxxx` and some its `BaseXXX`. may be a follow up 
code cleanup JIRA?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.function.SerializableFunction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * An abstraction for a data collection of objects in type T to store the 
reference
+ * and do transformation.
+ *
+ * @param <T> type of object.
+ */
+public abstract class HoodieData<T> implements Serializable {
+  /**
+   * @return the collection of objects.
+   */
+  public abstract Object get();
+
+
+  /**
+   * @return whether the collection is empty.
+   */
+  public abstract boolean isEmpty();
+
+  /**
+   * Caches the data.
+   *
+   * @param properties config in properties.
+   */
+  public abstract void persist(Properties properties);

Review comment:
       +1 for that. its hard to avoid these at the API level.

##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
##########
@@ -62,6 +67,21 @@ public HoodieJavaEngineContext(Configuration conf, 
TaskContextSupplier taskConte
     super(new SerializableConfiguration(conf), taskContextSupplier);
   }
 
+  @Override
+  public HoodieAccumulator createNewAccumulator() {
+    return HoodieAtomicLongAccumulator.create();
+  }
+
+  @Override
+  public <T> HoodieData<T> createEmptyHoodieData() {

Review comment:
       rename. `emptyData()` just like `Collections.emptyList()` . 

##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
##########
@@ -62,6 +67,21 @@ public HoodieJavaEngineContext(Configuration conf, 
TaskContextSupplier taskConte
     super(new SerializableConfiguration(conf), taskContextSupplier);
   }
 
+  @Override
+  public HoodieAccumulator createNewAccumulator() {

Review comment:
       rename: `newAccumulator()`

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
##########
@@ -97,15 +98,20 @@
       HoodieEngineContext context,
       String instantTime,
       Option<Map<String, String>> extraMetadata) {
-    BaseScheduleCompactionActionExecutor scheduleCompactionExecutor = new 
FlinkScheduleCompactionActionExecutor(
-        context, config, this, instantTime, extraMetadata);
+    ScheduleCompactionActionExecutor scheduleCompactionExecutor = new 
ScheduleCompactionActionExecutor(
+        context, config, this, instantTime, extraMetadata,
+        new HoodieFlinkMergeOnReadTableCompactor());
     return scheduleCompactionExecutor.execute();
   }
 
   @Override
-  public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext 
context, String compactionInstantTime) {
-    throw new HoodieNotSupportedException("Compaction is supported as a 
separate pipeline, "
-        + "should not invoke directly through HoodieFlinkMergeOnReadTable");
+  public HoodieWriteMetadata<List<WriteStatus>> compact(
+      HoodieEngineContext context, String compactionInstantTime, 
AbstractHoodieWriteClient writeClient) {
+    RunCompactionActionExecutor compactionExecutor = new 
RunCompactionActionExecutor(

Review comment:
       so we are adding compaction to Flink? with this PR?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDDData.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.utils.SparkMemoryUtils;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.function.SerializableFunction;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Holds a {@link JavaRDD} of objects.
+ *
+ * @param <T> type of object.
+ */
+public class HoodieJavaRDDData<T> extends HoodieData<T> {
+
+  private final JavaRDD<T> rddData;
+
+  private HoodieJavaRDDData(JavaRDD<T> rddData) {
+    this.rddData = rddData;
+  }
+
+  /**
+   * @param rddData a {@link JavaRDD} of objects in type T.
+   * @param <T>     type of object.
+   * @return a new instance containing the {@link JavaRDD<T>} reference.
+   */
+  public static <T> HoodieJavaRDDData<T> of(JavaRDD<T> rddData) {
+    return new HoodieJavaRDDData<>(rddData);
+  }
+
+  /**
+   * @param data        a {@link List} of objects in type T.
+   * @param context     {@link HoodieSparkEngineContext} to use.
+   * @param parallelism parallelism for the {@link JavaRDD<T>}.
+   * @param <T>         type of object.
+   * @return a new instance containing the {@link JavaRDD<T>} instance.
+   */
+  public static <T> HoodieJavaRDDData<T> of(
+      List<T> data, HoodieSparkEngineContext context, int parallelism) {
+    return new 
HoodieJavaRDDData<>(context.getJavaSparkContext().parallelize(data, 
parallelism));
+  }
+
+  /**
+   * @param hoodieData {@link HoodieJavaRDDData<T>} instance containing the 
{@link JavaRDD} of objects.
+   * @param <T>        type of object.
+   * @return the a {@link JavaRDD} of objects in type T.
+   */
+  public static <T> JavaRDD<T> getJavaRDD(HoodieData<T> hoodieData) {
+    return ((HoodieJavaRDDData<T>) hoodieData).get();
+  }
+
+  @Override
+  public JavaRDD<T> get() {
+    return rddData;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return rddData.isEmpty();
+  }
+
+  @Override
+  public void persist(Properties properties) {
+    rddData.persist(SparkMemoryUtils.getWriteStatusStorageLevel(properties));

Review comment:
       this is specific to write status? I think we should do this more 
generically. right now, its a bit misleading IMO that the method is called 
`persist()` 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
##########
@@ -46,6 +46,36 @@
   public HoodieWriteMetadata() {
   }
 
+  /**
+   * Clones the write metadata with transformed write statuses.
+   *
+   * @param transformedWriteStatuses transformed write statuses
+   * @param <T>                      type of transformed write statuses
+   * @return Cloned {@link HoodieWriteMetadata<T>} instance
+   */
+  public <T> HoodieWriteMetadata<T> clone(T transformedWriteStatuses) {
+    HoodieWriteMetadata<T> newMetadataInstance = new HoodieWriteMetadata<>();
+    newMetadataInstance.setWriteStatuses(transformedWriteStatuses);
+    if (indexLookupDuration.isPresent()) {

Review comment:
       @danny0405 the reason is for lazy evaluation in Spark, where we hand 
back a RDD[WriteStatus] and the write it triggered only when you collect that. 
the WriteStatus may actually contian failed records etc, which can be too large 
for a single jvm/java collection. This method gives you the option of 
inspecting the RDD first and then collect.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -366,12 +367,13 @@ public HoodieActiveTimeline getActiveTimeline() {
   /**
    * Run Compaction on the table. Compaction arranges the data so that it is 
optimized for data access.
    *
-   * @param context HoodieEngineContext
+   * @param context               HoodieEngineContext
    * @param compactionInstantTime Instant Time
+   * @param writeClient           Write client
    */
   public abstract HoodieWriteMetadata<O> compact(HoodieEngineContext context,
-                                              String compactionInstantTime);
-
+                                                 String compactionInstantTime,
+                                                 AbstractHoodieWriteClient 
writeClient);

Review comment:
       Could n't we call `table.rollback()` or something?  might be good to fix 
that instead of leaking this in.?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to