vinothchandar commented on a change in pull request #1827:
URL: https://github.com/apache/hudi/pull/1827#discussion_r484134852



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMarkerBasedRollbackStrategy.java
##########
@@ -18,63 +18,58 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieEngineContext;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.MarkerFiles;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
-import scala.Tuple2;
-
 /**
  * Performs rollback using marker files generated during the write..
  */
-public class MarkerBasedRollbackStrategy implements 
BaseRollbackActionExecutor.RollbackStrategy {
+public abstract class BaseMarkerBasedRollbackStrategy<T extends 
HoodieRecordPayload, I, K, O, P> implements 
BaseRollbackActionExecutor.RollbackStrategy {

Review comment:
       These sort of classes, we should have a way to implement with just a 
reference to `engineContext` ideally. Even though we cannot implement every 
method in `sparkContext`. This is a topic for later

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
##########
@@ -60,38 +60,38 @@ public static void main(String[] args) throws Exception {
    */
   public void run(JavaSparkContext jsc) throws Exception {
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
-    try (CompactionAdminClient admin = new CompactionAdminClient(jsc, 
cfg.basePath)) {
+    try (HoodieSparkCompactionAdminClient admin = new 
HoodieSparkCompactionAdminClient(new HoodieSparkEngineContext(jsc), 
cfg.basePath)) {
       final FileSystem fs = FSUtils.getFs(cfg.basePath, 
jsc.hadoopConfiguration());
       if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
         throw new IllegalStateException("Output File Path already exists");
       }
       switch (cfg.operation) {
         case VALIDATE:
-          List<ValidationOpResult> res =
+          List<BaseCompactionAdminClient.ValidationOpResult> res =
               admin.validateCompactionPlan(metaClient, 
cfg.compactionInstantTime, cfg.parallelism);
           if (cfg.printOutput) {
             printOperationResult("Result of Validation Operation :", res);
           }
           serializeOperationResult(fs, res);
           break;
         case UNSCHEDULE_FILE:
-          List<RenameOpResult> r = admin.unscheduleCompactionFileId(
+          List<BaseCompactionAdminClient.RenameOpResult> r = 
admin.unscheduleCompactionFileId(
               new HoodieFileGroupId(cfg.partitionPath, cfg.fileId), 
cfg.skipValidation, cfg.dryRun);
           if (cfg.printOutput) {
             System.out.println(r);
           }
           serializeOperationResult(fs, r);
           break;
         case UNSCHEDULE_PLAN:
-          List<RenameOpResult> r2 = 
admin.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation,
+          List<BaseCompactionAdminClient.RenameOpResult> r2 = 
admin.unscheduleCompactionPlan(cfg.compactionInstantTime, cfg.skipValidation,

Review comment:
       wondering if this renaming will have any impact on deserializing older 
plans. cc @bvaradar to confirm 

##########
File path: style/checkstyle.xml
##########
@@ -62,7 +62,7 @@
             <property name="allowNonPrintableEscapes" value="true"/>
         </module>
         <module name="LineLength">
-            <property name="max" value="200"/>
+            <property name="max" value="500"/>

Review comment:
       let's discuss this in a separate PR? 500 is a really large threshold

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseMarkerBasedRollbackStrategy.java
##########
@@ -132,32 +127,4 @@ private HoodieRollbackStat undoAppend(String 
appendBaseFilePath, HoodieInstant i
         .build();
   }
 
-  @Override
-  public List<HoodieRollbackStat> execute(HoodieInstant instantToRollback) {
-    try {
-      MarkerFiles markerFiles = new MarkerFiles(table, 
instantToRollback.getTimestamp());
-      List<String> markerFilePaths = markerFiles.allMarkerFilePaths();
-      int parallelism = Math.max(Math.min(markerFilePaths.size(), 
config.getRollbackParallelism()), 1);
-      return jsc.parallelize(markerFilePaths, parallelism)
-          .map(markerFilePath -> {

Review comment:
       would a `parallelDo(func, parallelism)` method in `HoodieEngineContext` 
help us avoid a lot of base/child class duplication of logic like this? 
   
   Most of clean, compact, rollback, restore etc can be implemented this way. 
Most of them just take a list, parallelize it, and execute some function, 
collect results and get the objects back

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
##########
@@ -64,8 +64,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
     throw new HoodieException("Incremental view not implemented yet, for 
merge-on-read tables")
   }
   // TODO : Figure out a valid HoodieWriteConfig
-  private val hoodieTable = HoodieTable.create(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(),
-    sqlContext.sparkContext.hadoopConfiguration)
+  private val hoodieTable = 
HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(),
 new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)) 
,metaClient)

Review comment:
       nit: no space before `,` and space after `, metaClient` 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RollbackUtils {
+
+  private static final Logger LOG = LogManager.getLogger(RollbackUtils.class);
+
+  static Map<HoodieLogBlock.HeaderMetadataType, String> generateHeader(String 
instantToRollback, String rollbackInstantTime) {
+    // generate metadata
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(3);
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
rollbackInstantTime);
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, 
instantToRollback);
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+    return header;
+  }
+
+  /**
+   * Helper to merge 2 rollback-stats for a given partition.
+   *
+   * @param stat1 HoodieRollbackStat
+   * @param stat2 HoodieRollbackStat
+   * @return Merged HoodieRollbackStat
+   */
+  static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, 
HoodieRollbackStat stat2) {
+    
ValidationUtils.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath()));
+    final List<String> successDeleteFiles = new ArrayList<>();
+    final List<String> failedDeleteFiles = new ArrayList<>();
+    final Map<FileStatus, Long> commandBlocksCount = new HashMap<>();
+    final List<FileStatus> filesToRollback = new ArrayList<>();
+    
Option.ofNullable(stat1.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
+    
Option.ofNullable(stat2.getSuccessDeleteFiles()).ifPresent(successDeleteFiles::addAll);
+    
Option.ofNullable(stat1.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
+    
Option.ofNullable(stat2.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll);
+    
Option.ofNullable(stat1.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
+    
Option.ofNullable(stat2.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll);
+    return new HoodieRollbackStat(stat1.getPartitionPath(), 
successDeleteFiles, failedDeleteFiles, commandBlocksCount);
+  }
+
+  /**
+   * Generate all rollback requests that needs rolling back this action 
without actually performing rollback for COW table type.
+   * @param fs instance of {@link FileSystem} to use.
+   * @param basePath base path of interest.
+   * @param shouldAssumeDatePartitioning {@code true} if date partitioning 
should be assumed. {@code false} otherwise.
+   * @return {@link List} of {@link ListingBasedRollbackRequest}s thus 
collected.
+   */
+  public static List<ListingBasedRollbackRequest> 
generateRollbackRequestsByListingCOW(FileSystem fs, String basePath, boolean 
shouldAssumeDatePartitioning) {

Review comment:
       the MOR equivalent method got moved I guess

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/HoodieSparkAsyncCompactService.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.asyc.BaseAsyncCompactService;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.BaseCompactor;
+import org.apache.hudi.client.HoodieSparkCompactor;
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+
+public class HoodieSparkAsyncCompactService extends BaseAsyncCompactService {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkAsyncCompactService.class);
+
+  private transient JavaSparkContext jssc;
+  public HoodieSparkAsyncCompactService(HoodieEngineContext context, 
AbstractHoodieWriteClient client) {
+    super(context, client);
+    this.jssc = HoodieSparkEngineContext.getSparkContext(context);
+  }
+
+  public HoodieSparkAsyncCompactService(HoodieEngineContext context, 
AbstractHoodieWriteClient client, boolean runInDaemonMode) {
+    super(context, client, runInDaemonMode);
+    this.jssc = HoodieSparkEngineContext.getSparkContext(context);
+  }
+
+  @Override
+  protected BaseCompactor createCompactor(AbstractHoodieWriteClient client) {
+    return new HoodieSparkCompactor(client);
+  }
+
+  @Override
+  protected Pair<CompletableFuture, ExecutorService> startService() {
+    ExecutorService executor = 
Executors.newFixedThreadPool(maxConcurrentCompaction,
+        r -> {
+          Thread t = new Thread(r, "async_compact_thread");
+          t.setDaemon(isRunInDaemonMode());
+          return t;
+        });
+    return Pair.of(CompletableFuture.allOf(IntStream.range(0, 
maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
+      try {
+        // Set Compactor Pool Name for allowing users to prioritize compaction
+        LOG.info("Setting Spark Pool name for compaction to " + 
COMPACT_POOL_NAME);
+        jssc.setLocalProperty("spark.scheduler.pool", COMPACT_POOL_NAME);

Review comment:
       might make sense to move the COMPACT_POOL_NAME also to the child class

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java
##########
@@ -43,18 +43,18 @@
 /**
  * Spark Data frame based bootstrap input provider.
  */
-public class SparkParquetBootstrapDataProvider extends 
FullRecordBootstrapDataProvider {
+public class SparkParquetBootstrapDataProvider extends 
FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {
 
   private final transient SparkSession sparkSession;
 
   public SparkParquetBootstrapDataProvider(TypedProperties props,
-                                           JavaSparkContext jsc) {
-    super(props, jsc);
-    this.sparkSession = 
SparkSession.builder().config(jsc.getConf()).getOrCreate();
+                                           HoodieSparkEngineContext context) {
+    super(props, context);
+    this.sparkSession = 
SparkSession.builder().config(context.getJavaSparkContext().getConf()).getOrCreate();
   }
 
   @Override
-  public JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName, String 
sourceBasePath,
+  public JavaRDD<HoodieRecord> generateInputRecord(String tableName, String 
sourceBasePath,

Review comment:
       rename: generateInputRecords

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
##########
@@ -81,7 +82,9 @@
    */
   public static SparkConf getSparkConfForTest(String appName) {
     SparkConf sparkConf = new SparkConf().setAppName(appName)
-        .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
+        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+        .set("spark.driver.host","localhost")

Review comment:
       why was this change required?

##########
File path: style/checkstyle.xml
##########
@@ -62,7 +62,7 @@
             <property name="allowNonPrintableEscapes" value="true"/>
         </module>
         <module name="LineLength">
-            <property name="max" value="200"/>
+            <property name="max" value="500"/>

Review comment:
       why is this necessary for this PR? 

##########
File path: hudi-client/pom.xml
##########
@@ -68,6 +107,12 @@
   </build>
 
   <dependencies>
+    <!-- Scala -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>

Review comment:
       should we limit scala to just the spark module? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to