vinothchandar commented on a change in pull request #623: Hudi Test Suite
URL: https://github.com/apache/incubator-hudi/pull/623#discussion_r276857408
 
 

 ##########
 File path: 
hoodie-bench/src/main/java/com/uber/hoodie/integrationsuite/job/HudiTestSuiteJob.java
 ##########
 @@ -0,0 +1,167 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. ([email protected])
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.integrationsuite.job;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.uber.hoodie.DataSourceUtils;
+import com.uber.hoodie.WriteStatus;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.ReflectionUtils;
+import com.uber.hoodie.common.util.TypedProperties;
+import com.uber.hoodie.common.util.collection.Pair;
+import 
com.uber.hoodie.integrationsuite.configuration.WorkloadOperationSequenceGenerator;
+import com.uber.hoodie.integrationsuite.generator.ComplexKeyGenerator;
+import com.uber.hoodie.integrationsuite.job.action.result.ActionResult;
+import 
com.uber.hoodie.integrationsuite.job.operation.WorkloadOperationSequence;
+import com.uber.hoodie.integrationsuite.sink.SinkFormat;
+import com.uber.hoodie.integrationsuite.sink.SinkType;
+import com.uber.hoodie.integrationsuite.sink.writer.WriteStats;
+import com.uber.hoodie.integrationsuite.workload.WorkloadManager;
+import com.uber.hoodie.utilities.UtilHelpers;
+import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
+import com.uber.hoodie.utilities.schema.SchemaProvider;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * This is the entry point for running a Hudi Test Suite. Although this class 
has similarities with
+ * {@link HoodieDeltaStreamer} this class does not extend it since do not want 
to create a dependency on the changes in
+ * DeltaStreamer.
+ */
+public class HudiTestSuiteJob {
+
+  private static volatile Logger log = 
LogManager.getLogger(HudiTestSuiteJob.class);
+
+  private final HudiTestSuiteConfig cfg;
+  /**
+   * Bag of properties with source, hoodie client, key generator etc.
+   */
+  TypedProperties props;
+  /**
+   * Schema provider that supplies the command for writing out the generated 
payloads
+   */
+  private transient SchemaProvider schemaProvider;
+  /**
+   * Filesystem used
+   */
+  private transient FileSystem fs;
+  /**
+   * Spark context
+   */
+  private transient JavaSparkContext jsc;
+  /**
+   * Spark Session
+   */
+  private transient SparkSession sparkSession;
+  /**
+   * Hive Config
+   */
+  private transient HiveConf hiveConf;
+
+  private ComplexKeyGenerator keyGenerator;
+
+  public HudiTestSuiteJob(HudiTestSuiteConfig cfg, JavaSparkContext jsc) 
throws IOException {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.sparkSession = 
SparkSession.builder().config(jsc.getConf()).getOrCreate();
+    this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
+    this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
+    log.info("Creating workload generator with configs : " + props.toString());
+    this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
+    this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
+    this.keyGenerator = (ComplexKeyGenerator) 
DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
+    if (!fs.exists(new Path(cfg.targetBasePath))) {
+      HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), 
cfg.targetBasePath,
+          cfg.storageType, cfg.targetTableName, "archived");
+    }
+  }
+
+  private static HiveConf getDefaultHiveConf(Configuration cfg) {
+    HiveConf hiveConf = new HiveConf();
+    hiveConf.addResource(cfg);
+    return hiveConf;
+  }
+
+  public static void main(String[] args) throws Exception {
+    final HudiTestSuiteConfig cfg = new HudiTestSuiteConfig();
+    JCommander cmd = new JCommander(cfg, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    JavaSparkContext jssc = 
UtilHelpers.buildSparkContext("workload-generator-" + cfg.sinkTypeName
+        + "-" + cfg.sinkFormatName, cfg.sparkMaster);
+    new HudiTestSuiteJob(cfg, jssc).runTestSuite();
+  }
+
+  public Pair<List<JavaRDD<WriteStats>>, List<Pair<JavaRDD<WriteStatus>, 
List<List<ActionResult>>>>> runTestSuite()
+      throws Exception {
+    WorkloadOperationSequenceGenerator generator = ReflectionUtils
+        .loadClass((this.cfg).workloadGenClassName);
+    WorkloadOperationSequence operationSequence = generator.getSequence();
+    WorkloadManager workloadManager = new WorkloadManager(this.sparkSession, 
this.props, this.jsc,
+        this.schemaProvider.getSourceSchema().toString(), this.cfg);
+    return workloadManager.generateAndExecuteWorkload(operationSequence, 
this.cfg, this.keyGenerator);
+  }
+
+  /**
+   * The Hudi test suite uses {@link HoodieDeltaStreamer} to run some 
operations hence extend delta streamer config
+   */
+  public static class HudiTestSuiteConfig extends HoodieDeltaStreamer.Config {
+
+    @Parameter(names = {"--input-base-path"}, description = "base path for 
input data"
+        + "(Will be created if did not exist first time around. If exists, 
more data will be added to that path)",
+        required = true)
+    public String inputBasePath;
+
+    @Parameter(names = {"--workload-generator-classname"}, description = "DAG 
of operations to generate the workload",
+        required = true)
+    public String workloadGenClassName = 
WorkloadOperationSequenceGenerator.class.getName();
+
+    @Parameter(names = {"--sink-type"}, description = "Subclass of "
 
 Review comment:
   this actually contains the input data right?  should it be just called 
input-type/input-format instead of sink-type etc? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to