n3nash commented on a change in pull request #623: Hudi Test Suite
URL: https://github.com/apache/incubator-hudi/pull/623#discussion_r276527338
 
 

 ##########
 File path: 
hoodie-bench/src/main/java/com/uber/hoodie/integrationsuite/workload/WorkloadManager.java
 ##########
 @@ -0,0 +1,289 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. ([email protected])
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.integrationsuite.workload;
+
+import com.uber.hoodie.WriteStatus;
+import com.uber.hoodie.common.SerializableConfiguration;
+import com.uber.hoodie.common.model.HoodieCommitMetadata;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.ReflectionUtils;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.integrationsuite.configuration.WorkloadOperationConfig;
+import com.uber.hoodie.integrationsuite.generator.ComplexKeyGenerator;
+import com.uber.hoodie.integrationsuite.generator.WorkloadGenerator;
+import 
com.uber.hoodie.integrationsuite.job.HudiTestSuiteJob.HudiTestSuiteConfig;
+import com.uber.hoodie.integrationsuite.job.ThreadPoolService;
+import com.uber.hoodie.integrationsuite.job.action.Action;
+import com.uber.hoodie.integrationsuite.job.action.CompactAction;
+import com.uber.hoodie.integrationsuite.job.action.ValidateAction;
+import com.uber.hoodie.integrationsuite.job.action.result.ActionResult;
+import com.uber.hoodie.integrationsuite.job.action.result.BooleanResult;
+import com.uber.hoodie.integrationsuite.job.action.result.JavaRDDResult;
+import com.uber.hoodie.integrationsuite.job.action.result.StringResult;
+import com.uber.hoodie.integrationsuite.job.operation.OperationActionSequence;
+import 
com.uber.hoodie.integrationsuite.job.operation.WorkloadOperationSequence;
+import com.uber.hoodie.integrationsuite.sink.SinkFormat;
+import com.uber.hoodie.integrationsuite.sink.SinkType;
+import com.uber.hoodie.integrationsuite.sink.config.DFSSinkConfig;
+import com.uber.hoodie.integrationsuite.sink.writer.WriteStats;
+import com.uber.hoodie.integrationsuite.validate.IValidate;
+import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
+import com.uber.hoodie.utilities.exception.HoodieWorkloadManagerException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Manages the workload generation and execution sequence provided by an 
implementation of
+ * {@link 
com.uber.hoodie.integrationsuite.configuration.WorkloadOperationSequenceGenerator}.
+ */
+public class WorkloadManager {
+
+  private static Logger log = Logger.getLogger(WorkloadManager.class);
+  private WorkloadGenerator workloadGenerator;
+  private SparkSession sparkSession;
+  private JavaSparkContext jsc;
+  private String schemaStr;
+  private Properties props;
+  private HudiTestSuiteConfig cfg;
+
+  public WorkloadManager(SparkSession sparkSession, Properties props, 
JavaSparkContext jsc, String schemaStr,
+      HudiTestSuiteConfig cfg) {
+    this.sparkSession = sparkSession;
+    this.props = props;
+    this.jsc = jsc;
+    this.schemaStr = schemaStr;
+    this.cfg = cfg;
+    this.workloadGenerator = new WorkloadGenerator(
+        new DFSSinkConfig(SinkType.valueOf(cfg.sinkTypeName), 
SinkFormat.valueOf(cfg.sinkFormatName),
+            new SerializableConfiguration(jsc.hadoopConfiguration()), 
cfg.inputBasePath, schemaStr, cfg.limitFileSize));
+  }
+
+  public Pair<List<JavaRDD<WriteStats>>, List<Pair<JavaRDD<WriteStatus>, 
List<List<ActionResult>>>>>
+      generateAndExecuteWorkload(WorkloadOperationSequence 
workloadOperationSequence, HudiTestSuiteConfig config,
+      ComplexKeyGenerator keyGenerator) throws Exception {
+    final WorkloadWriter writer = new WorkloadWriter(jsc, props, cfg, 
schemaStr);
+    Pair<List<JavaRDD<WriteStats>>, List<Pair<JavaRDD<WriteStatus>, 
List<List<ActionResult>>>>> workloadResults =
+        generateWorkload(workloadOperationSequence, writer, config, 
keyGenerator);
+    
workloadResults.getRight().addAll(executeWorkload(workloadOperationSequence, 
writer, config));
+    return workloadResults;
+  }
+
+  public Pair<List<JavaRDD<WriteStats>>, List<Pair<JavaRDD<WriteStatus>, 
List<List<ActionResult>>>>> generateWorkload(
+      WorkloadOperationSequence workloadOperationSequence, WorkloadWriter 
writer, HudiTestSuiteConfig config,
+      ComplexKeyGenerator keyGenerator) throws Exception {
+    List<JavaRDD<WriteStats>> workloadGenerationResult = new LinkedList<>();
+    List<Pair<JavaRDD<WriteStatus>, List<List<ActionResult>>>> 
workloadExecutionResult = new LinkedList<>();
+    Integer batchId = 0;
+    for (WorkloadOperationConfig workloadOperationConfig : 
workloadOperationSequence.getWorkloadOperationList()) {
+      int i = 0;
+      while (i < workloadOperationConfig.getRepeatCount()) {
+        JavaRDD<WriteStats> writeStats = 
generateSingleWorkload(workloadOperationConfig, keyGenerator, batchId);
+        workloadGenerationResult.add(writeStats);
+        i++;
+        batchId++;
+        if (workloadOperationSequence.isRunWorkloadInline()) {
+          // Trigger workload generation if write stats present
+          if (writeStats != null) {
+            writeStats.count();
+          }
+          
workloadExecutionResult.add(this.executeSingleWorkload(workloadOperationConfig, 
writer, config));
+        }
+      }
+    }
+    return Pair.of(workloadGenerationResult, workloadExecutionResult);
+  }
+
+  private JavaRDD<WriteStats> generateSingleWorkload(WorkloadOperationConfig 
workloadOperationConfig,
+      ComplexKeyGenerator keyGenerator, int batchId)
+      throws IOException {
+    JavaRDD<WriteStats> writeStats = null;
+    switch (workloadOperationConfig.getOperation()) {
+      case INSERT:
+      case BULK_INSERT:
+        JavaRDD<GenericRecord> inserts = 
workloadGenerator.generateInserts(jsc, workloadOperationConfig,
+            schemaStr, keyGenerator.getPartitionPathFields());
+        writeStats = workloadGenerator.writeRecords(inserts, batchId);
+        break;
+      case UPSERT:
+        JavaRDD<GenericRecord> upserts = 
workloadGenerator.generateUpdates(jsc, sparkSession,
+            workloadOperationConfig, keyGenerator.getRecordKeyFields(), 
keyGenerator.getPartitionPathFields(),
+            schemaStr);
+        writeStats = workloadGenerator.writeRecords(upserts, batchId);
+        break;
+      case ROLLBACK:
+      case COMPACT:
+        break;
+      default:
+        throw new IllegalArgumentException("invalid operation type " + 
workloadOperationConfig.getOperation());
+    }
+    return writeStats;
+  }
+
+  public List<Pair<JavaRDD<WriteStatus>, List<List<ActionResult>>>> 
executeWorkload(WorkloadOperationSequence
+      workloadOperationSequence, WorkloadWriter writer, HudiTestSuiteConfig 
config) {
+    List<Pair<JavaRDD<WriteStatus>, List<List<ActionResult>>>> 
workloadExecutionResult = new LinkedList<>();
+    if (!workloadOperationSequence.isRunWorkloadInline()) {
+      for (WorkloadOperationConfig workloadOperationConfig : 
workloadOperationSequence.getWorkloadOperationList()) {
+        
workloadExecutionResult.add(executeSingleWorkload(workloadOperationConfig, 
writer, config));
+      }
+    }
+    return workloadExecutionResult;
+  }
+
+  // TODO : consolidate futures into a single class
+  // TODO : use google listenable futures to do post process actions
+  private Pair<JavaRDD<WriteStatus>, List<List<ActionResult>>> 
executeSingleWorkload(WorkloadOperationConfig
+      workloadOperationConfig, WorkloadWriter writer, HudiTestSuiteConfig 
config) {
+    ThreadPoolService poolService = new 
ThreadPoolService(workloadOperationConfig.getOperationActionSequences()
+        .orElse(new ArrayList<>()).size() + 1);
+    JavaRDD<WriteStatus> writeStatusRDD = null;
+    try {
+      if (workloadOperationConfig.isRunOperationAndActionsInParallel()) {
+        poolService.execute(() -> executeOperation(workloadOperationConfig, 
writer, config));
+      } else {
+        writeStatusRDD = executeOperation(workloadOperationConfig,
+            writer, config);
+      }
+      if (workloadOperationConfig.getOperationActionSequences().isPresent()) {
+        for (OperationActionSequence operationActionSequence : 
workloadOperationConfig.getOperationActionSequences()
+            .get()) {
+          final JavaRDD<WriteStatus> writeStatusRDDFinal = writeStatusRDD;
+          poolService.submit(() -> {
+            try {
+              return executeOperationActions(operationActionSequence, 
writeStatusRDDFinal, new WorkloadWriter(jsc,
+                  props, cfg,
+                  schemaStr, false));
+            } catch (Exception e) {
+              throw new HoodieWorkloadManagerException("unable to execute 
dependent operation sequence", e);
+            }
+          });
+        }
+      }
+    } catch (Exception e) {
+      throw new HoodieWorkloadManagerException("failed to execute operation 
actions", e);
+    }
+    List<List<ActionResult>> actionResults = (List<List<ActionResult>>) 
poolService.finishAll();
+    return Pair.of(writeStatusRDD, actionResults);
+  }
+
+  private JavaRDD<WriteStatus> executeOperation(WorkloadOperationConfig 
workloadOperationConfig, WorkloadWriter writer,
+      HudiTestSuiteConfig config) {
+    try {
+      JavaRDD<WriteStatus> writeStatusRDD = null;
+      Optional<String> instantTime = Optional.empty();
+      switch (workloadOperationConfig.getOperation()) {
+        case BULK_INSERT:
+          instantTime = writer.startCommit();
+          config.operation = Operation.BULK_INSERT;
+          writeStatusRDD = writer.bulkInsert(instantTime);
+          writer.commit(writeStatusRDD, instantTime);
+          break;
+        case INSERT:
+          instantTime = writer.startCommit();
+          config.operation = Operation.INSERT;
+          writeStatusRDD = writer.insert(instantTime);
+          writer.commit(writeStatusRDD, instantTime);
+          break;
+        case UPSERT:
+          instantTime = writer.startCommit();
+          config.operation = Operation.UPSERT;
+          writeStatusRDD = writer.upsert(instantTime);
+          writer.commit(writeStatusRDD, instantTime);
+          break;
+        case COMPACT:
+          config.operation = Operation.COMPACT;
+          writeStatusRDD = writer.compact(instantTime);
+          writer.commit(writeStatusRDD, instantTime);
+          break;
+        case ROLLBACK:
+          // Can only be done with an instantiation of a new WriteClient hence 
cannot be done during DeltaStreamer
+          // testing for now
+          HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.targetBasePath);
+          Optional<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
+          if (lastInstant.isPresent()) {
+            writer.getWriteClient().rollback(lastInstant.get().getTimestamp());
+          } else {
+            log.warn("No instants to rollback in the Timeline");
+          }
+          break;
+        default:
+          throw new IllegalArgumentException("Invalid operation type " + 
workloadOperationConfig.getOperation());
+      }
+      return writeStatusRDD;
+    } catch (Exception e) {
+      throw new HoodieWorkloadManagerException("Unable to execute operations 
", e);
+    }
+  }
+
+  private List<ActionResult> executeOperationActions(final 
OperationActionSequence operationActionSequence,
+      final JavaRDD<WriteStatus> writeStatusRDD, final WorkloadWriter writer) {
+
+    return 
operationActionSequence.getOperationActionSequence().stream().map(action -> {
+      Action actionType = action.getAction();
+      try {
+        switch (actionType) {
+          case COMPACT:
+            Optional<String> compactionInstant = ((CompactAction) 
action).getCompactionInstant();
+            JavaRDD<WriteStatus> ret = writer.compact(compactionInstant);
+            return new JavaRDDResult(ret);
+          case SCHEDULE_COMPACTION:
+            // Can only be done with an instantiation of a new WriteClient 
hence cannot be done during DeltaStreamer
+            // testing for now
+            // Find the last commit and extra the extra metadata to be passed 
to the schedule compaction. This is
 
 Review comment:
   fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to