This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fec7cd3  [HUDI-1130] hudi-test-suite support for schema evolution (can 
be triggered on any insert/upsert DAG node).
fec7cd3 is described below

commit fec7cd3c9704ce366d1d92fd4d9941a7edbe8198
Author: Balajee Nagasubramaniam <[email protected]>
AuthorDate: Mon Aug 24 15:56:07 2020 -0700

    [HUDI-1130] hudi-test-suite support for schema evolution (can be triggered 
on any insert/upsert DAG node).
---
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |  19 +---
 .../integ/testsuite/configuration/DeltaConfig.java |  10 ++
 .../hudi/integ/testsuite/dag/ExecutionContext.java |  16 ++--
 .../hudi/integ/testsuite/dag/WriterContext.java    | 102 +++++++++++++++++++++
 .../hudi/integ/testsuite/dag/nodes/InsertNode.java |   5 +
 .../testsuite/dag/scheduler/DagScheduler.java      |   7 +-
 6 files changed, 133 insertions(+), 26 deletions(-)

diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index de8000c..2c4b73a 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -26,21 +26,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
 import org.apache.hudi.integ.testsuite.dag.DagUtils;
 import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
 import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
 import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
-import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
 import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
 import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
+import org.apache.hudi.integ.testsuite.dag.WriterContext;
 import org.apache.hudi.keygen.BuiltinKeyGenerator;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
@@ -65,10 +63,6 @@ public class HoodieTestSuiteJob {
    */
   TypedProperties props;
   /**
-   * Schema provider that supplies the command for writing out the generated 
payloads.
-   */
-  private transient SchemaProvider schemaProvider;
-  /**
    * Filesystem used.
    */
   private transient FileSystem fs;
@@ -95,7 +89,6 @@ public class HoodieTestSuiteJob {
     this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
     this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
     log.info("Creating workload generator with configs : {}", 
props.toString());
-    this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
     this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
     this.keyGenerator = (BuiltinKeyGenerator) 
DataSourceUtils.createKeyGenerator(props);
 
@@ -138,13 +131,9 @@ public class HoodieTestSuiteJob {
       WorkflowDag workflowDag = createWorkflowDag();
       log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
       long startTime = System.currentTimeMillis();
-      String schemaStr = schemaProvider.getSourceSchema().toString();
-      final HoodieTestSuiteWriter writer = new HoodieTestSuiteWriter(jsc, 
props, cfg, schemaStr);
-      final DeltaGenerator deltaGenerator = new DeltaGenerator(
-          new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), 
DeltaInputType.valueOf(cfg.inputFormatName),
-              new SerializableConfiguration(jsc.hadoopConfiguration()), 
cfg.inputBasePath, cfg.targetBasePath,
-              schemaStr, cfg.limitFileSize), jsc, sparkSession, schemaStr, 
keyGenerator);
-      DagScheduler dagScheduler = new DagScheduler(workflowDag, writer, 
deltaGenerator);
+      WriterContext writerContext = new WriterContext(jsc, props, cfg, 
keyGenerator, sparkSession);
+      writerContext.initContext(jsc);
+      DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext);
       dagScheduler.schedule();
       log.info("Finished scheduling all tasks, Time taken {}", 
System.currentTimeMillis() - startTime);
     } catch (Exception e) {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
index 30fa584..f20f84e 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java
@@ -82,6 +82,7 @@ public class DeltaConfig implements Serializable {
     private static String DISABLE_GENERATE = "disable_generate";
     private static String DISABLE_INGEST = "disable_ingest";
     private static String HIVE_LOCAL = "hive_local";
+    private static String REINIT_CONTEXT = "reinitialize_context";
 
     private Map<String, Object> configsMap;
 
@@ -133,6 +134,10 @@ public class DeltaConfig implements Serializable {
       return Boolean.valueOf(configsMap.getOrDefault(DISABLE_INGEST, 
false).toString());
     }
 
+    public boolean getReinitContext() {
+      return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT, 
false).toString());
+    }
+
     public Map<String, Object> getOtherConfigs() {
       if (configsMap == null) {
         return new HashMap<>();
@@ -222,6 +227,11 @@ public class DeltaConfig implements Serializable {
         return this;
       }
 
+      public Builder reinitializeContext(boolean reinitContext) {
+        this.configsMap.put(REINIT_CONTEXT, reinitContext);
+        return this;
+      }
+
       public Builder withConfig(String name, Object value) {
         this.configsMap.put(name, value);
         return this;
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
index eecd763..17148f5 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java
@@ -30,25 +30,27 @@ import org.apache.spark.api.java.JavaSparkContext;
  */
 public class ExecutionContext implements Serializable {
 
-  private HoodieTestSuiteWriter hoodieTestSuiteWriter;
-  private DeltaGenerator deltaGenerator;
+  private WriterContext writerContext;
   private transient JavaSparkContext jsc;
 
-  public ExecutionContext(JavaSparkContext jsc, HoodieTestSuiteWriter 
hoodieTestSuiteWriter, DeltaGenerator deltaGenerator) {
-    this.hoodieTestSuiteWriter = hoodieTestSuiteWriter;
-    this.deltaGenerator = deltaGenerator;
+  public ExecutionContext(JavaSparkContext jsc, WriterContext writerContext) {
+    this.writerContext = writerContext;
     this.jsc = jsc;
   }
 
   public HoodieTestSuiteWriter getHoodieTestSuiteWriter() {
-    return hoodieTestSuiteWriter;
+    return writerContext.getHoodieTestSuiteWriter();
   }
 
   public DeltaGenerator getDeltaGenerator() {
-    return deltaGenerator;
+    return writerContext.getDeltaGenerator();
   }
 
   public JavaSparkContext getJsc() {
     return jsc;
   }
+
+  public WriterContext getWriterContext() {
+    return writerContext;
+  }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
new file mode 100644
index 0000000..320c986
--- /dev/null
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag;
+
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
+import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
+import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
+import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
+import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
+import 
org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.Map;
+
+/**
+ * WriterContext wraps the delta writer/data generator related configuration 
needed
+ * to init/reinit.
+ */
+public class WriterContext {
+
+  protected static Logger log = LogManager.getLogger(WriterContext.class);
+
+  private final HoodieTestSuiteConfig cfg;
+  private TypedProperties props;
+  private HoodieTestSuiteWriter hoodieTestSuiteWriter;
+  private DeltaGenerator deltaGenerator;
+  private transient SchemaProvider schemaProvider;
+  private BuiltinKeyGenerator keyGenerator;
+  private transient SparkSession sparkSession;
+  private transient JavaSparkContext jsc;
+  public WriterContext(JavaSparkContext jsc, TypedProperties props, 
HoodieTestSuiteConfig cfg,
+                       BuiltinKeyGenerator keyGenerator, SparkSession 
sparkSession) {
+    this.cfg = cfg;
+    this.props = props;
+    this.keyGenerator = keyGenerator;
+    this.sparkSession = sparkSession;
+    this.jsc = jsc;
+  }
+
+  public void initContext(JavaSparkContext jsc) throws HoodieException {
+    try {
+      this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
+      String schemaStr = schemaProvider.getSourceSchema().toString();
+      this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, 
schemaStr);
+      this.deltaGenerator = new DeltaGenerator(
+          new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), 
DeltaInputType.valueOf(cfg.inputFormatName),
+              new SerializableConfiguration(jsc.hadoopConfiguration()), 
cfg.inputBasePath, cfg.targetBasePath,
+              schemaStr, cfg.limitFileSize),
+          jsc, sparkSession, schemaStr, keyGenerator);
+      log.info(String.format("Initialized writerContext with: %s", schemaStr));
+    } catch (Exception e) {
+      throw new HoodieException("Failed to reinitialize writerContext", e);
+    }
+  }
+
+  public void reinitContext(Map<String, Object> newConfig) throws 
HoodieException {
+    // update props with any config overrides.
+    for (Map.Entry<String, Object> e : newConfig.entrySet()) {
+      if (this.props.containsKey(e.getKey())) {
+        this.props.setProperty(e.getKey(), e.getValue().toString());
+      }
+    }
+    initContext(jsc);
+  }
+
+  public HoodieTestSuiteWriter getHoodieTestSuiteWriter() {
+    return hoodieTestSuiteWriter;
+  }
+
+  public DeltaGenerator getDeltaGenerator() {
+    return deltaGenerator;
+  }
+
+  public String toString() {
+    return this.hoodieTestSuiteWriter.toString() + "\n" + 
this.deltaGenerator.toString() + "\n";
+  }
+}
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
index 62db5b6..fdbcc1b 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
@@ -37,6 +37,11 @@ public class InsertNode extends 
DagNode<JavaRDD<WriteStatus>> {
 
   @Override
   public void execute(ExecutionContext executionContext) throws Exception {
+    // if the insert node has schema override set, reinitialize the table with 
new schema.
+    if (this.config.getReinitContext()) {
+      log.info(String.format("Reinitializing table with %s", 
this.config.getOtherConfigs().toString()));
+      
executionContext.getWriterContext().reinitContext(this.config.getOtherConfigs());
+    }
     generate(executionContext.getDeltaGenerator());
     log.info("Configs : {}", this.config);
     if (!config.isDisableIngest()) {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
index ce94341..2e9d437 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
@@ -30,10 +30,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
-import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
 import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
-import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
+import org.apache.hudi.integ.testsuite.dag.WriterContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,9 +46,9 @@ public class DagScheduler {
   private WorkflowDag workflowDag;
   private ExecutionContext executionContext;
 
-  public DagScheduler(WorkflowDag workflowDag, HoodieTestSuiteWriter 
hoodieTestSuiteWriter, DeltaGenerator deltaGenerator) {
+  public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext) {
     this.workflowDag = workflowDag;
-    this.executionContext = new ExecutionContext(null, hoodieTestSuiteWriter, 
deltaGenerator);
+    this.executionContext = new ExecutionContext(null, writerContext);
   }
 
   /**

Reply via email to