This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 14323cb  [HUDI-344] Improve exporter tests (#1404)
14323cb is described below

commit 14323cb10012bdbf80cbb838928af9301cb42ba0
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Sun Mar 15 05:24:30 2020 -0700

    [HUDI-344] Improve exporter tests (#1404)
---
 .../hudi/utilities/HoodieSnapshotExporter.java     |   9 +
 .../apache/hudi/utilities/DataSourceTestUtils.java |  50 ----
 .../hudi/utilities/TestHoodieSnapshotExporter.java | 318 +++++++++------------
 3 files changed, 151 insertions(+), 226 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index f785d74..b58b5d3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -132,6 +132,7 @@ public class HoodieSnapshotExporter {
         // No transformation is needed for output format "HUDI", just copy the 
original files.
         copySnapshot(jsc, fs, cfg, partitions, dataFiles, 
latestCommitTimestamp, serConf);
       }
+      createSuccessTag(fs, cfg.targetOutputPath);
     } else {
       LOG.info("The job has 0 partition to copy.");
     }
@@ -205,6 +206,14 @@ public class HoodieSnapshotExporter {
     }
   }
 
+  private void createSuccessTag(FileSystem fs, String targetOutputPath) throws 
IOException {
+    Path successTagPath = new Path(targetOutputPath + "/_SUCCESS");
+    if (!fs.exists(successTagPath)) {
+      LOG.info(String.format("Creating _SUCCESS under target output path: %s", 
targetOutputPath));
+      fs.createNewFile(successTagPath);
+    }
+  }
+
   public static void main(String[] args) throws IOException {
     // Take input configs
     final Config cfg = new Config();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
deleted file mode 100644
index 1a96b81..0000000
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities;
-
-import org.apache.hudi.common.TestRawTripPayload;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Test utils for data source tests.
- */
-public class DataSourceTestUtils {
-
-  public static Option<String> convertToString(HoodieRecord record) {
-    try {
-      String str = ((TestRawTripPayload) record.getData()).getJsonData();
-      str = "{" + str.substring(str.indexOf("\"timestamp\":"));
-      // Remove the last } bracket
-      str = str.substring(0, str.length() - 1);
-      return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() 
+ "\"}");
-    } catch (IOException e) {
-      return Option.empty();
-    }
-  }
-
-  public static List<String> convertToStringList(List<HoodieRecord> records) {
-    return 
records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get)
-        .collect(Collectors.toList());
-  }
-}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
index 920f1ed..f624247 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -18,205 +18,171 @@
 
 package org.apache.hudi.utilities;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.HoodieCommonTestHarness;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.HoodieClientTestHarness;
 import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieTestUtils;
-import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.SparkSession;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.utilities.HoodieSnapshotExporter.Config;
 
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.SparkSession;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-public class TestHoodieSnapshotExporter extends HoodieCommonTestHarness {
-  private static String TEST_WRITE_TOKEN = "1-0-1";
-
-  private SparkSession spark = null;
-  private HoodieTestDataGenerator dataGen = null;
-  private String outputPath = null;
-  private String rootPath = null;
-  private FileSystem fs = null;
-  private Map commonOpts;
-  private HoodieSnapshotExporter.Config cfg;
-  private JavaSparkContext jsc = null;
-
-  @Before
-  public void initialize() throws IOException {
-    spark = SparkSession.builder()
-        .appName("Hoodie Datasource test")
-        .master("local[2]")
-        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
-        .getOrCreate();
-    jsc = new JavaSparkContext(spark.sparkContext());
-    dataGen = new HoodieTestDataGenerator();
-    folder.create();
-    basePath = folder.getRoot().getAbsolutePath();
-    fs = FSUtils.getFs(basePath, spark.sparkContext().hadoopConfiguration());
-    commonOpts = new HashMap();
-
-    commonOpts.put("hoodie.insert.shuffle.parallelism", "4");
-    commonOpts.put("hoodie.upsert.shuffle.parallelism", "4");
-    commonOpts.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key");
-    commonOpts.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"partition");
-    commonOpts.put(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), 
"timestamp");
-    commonOpts.put(HoodieWriteConfig.TABLE_NAME, "hoodie_test");
-
-
-    cfg = new HoodieSnapshotExporter.Config();
-
-    cfg.sourceBasePath = basePath;
-    cfg.targetOutputPath = outputPath = basePath + "/target";
-    cfg.outputFormat = "json";
-    cfg.outputPartitionField = "partition";
+@RunWith(Enclosed.class)
+public class TestHoodieSnapshotExporter {
+
+  static class ExporterTestHarness extends HoodieClientTestHarness {
+
+    static final Logger LOG = LogManager.getLogger(ExporterTestHarness.class);
+    static final int NUM_RECORDS = 100;
+    static final String COMMIT_TIME = "20200101000000";
+    static final String PARTITION_PATH = "2020/01/01";
+    static final String TABLE_NAME = "testing";
+    String sourcePath;
+    String targetPath;
+
+    @Before
+    public void setUp() throws Exception {
+      initSparkContexts();
+      initDFS();
+      dataGen = new HoodieTestDataGenerator(new String[] {PARTITION_PATH});
+
+      // Initialize test data dirs
+      sourcePath = dfsBasePath + "/source/";
+      targetPath = dfsBasePath + "/target/";
+      dfs.mkdirs(new Path(sourcePath));
+      dfs.mkdirs(new Path(targetPath));
+      HoodieTableMetaClient
+          .initTableType(jsc.hadoopConfiguration(), sourcePath, 
HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
+              HoodieAvroPayload.class.getName());
+
+      // Prepare data as source Hudi dataset
+      HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
+      HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg);
+      hdfsWriteClient.startCommitWithTime(COMMIT_TIME);
+      List<HoodieRecord> records = dataGen.generateInserts(COMMIT_TIME, 
NUM_RECORDS);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+      hdfsWriteClient.bulkInsert(recordsRDD, COMMIT_TIME);
+      hdfsWriteClient.close();
+
+      RemoteIterator<LocatedFileStatus> itr = dfs.listFiles(new 
Path(sourcePath), true);
+      while (itr.hasNext()) {
+        LOG.info(">>> Prepared test file: " + itr.next().getPath());
+      }
+    }
 
-  }
+    @After
+    public void tearDown() throws Exception {
+      cleanupSparkContexts();
+      cleanupDFS();
+      cleanupTestDataGenerator();
+    }
 
-  @After
-  public void cleanup() {
-    if (spark != null) {
-      spark.stop();
+    private HoodieWriteConfig getHoodieWriteConfig(String basePath) {
+      return HoodieWriteConfig.newBuilder()
+          .withPath(basePath)
+          .withEmbeddedTimelineServerEnabled(false)
+          .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+          .withParallelism(2, 2)
+          .withBulkInsertParallelism(2)
+          .forTable(TABLE_NAME)
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
+          .build();
     }
   }
 
-  @Test
-  public void testSnapshotExporter() throws IOException {
-    // Insert Operation
-    List<String> records = 
DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100));
-    Dataset<Row> inputDF = spark.read().json(new 
JavaSparkContext(spark.sparkContext()).parallelize(records, 2));
-    inputDF.write().format("hudi")
-        .options(commonOpts)
-        .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
-        .mode(SaveMode.Overwrite)
-        .save(basePath);
-    long sourceCount = inputDF.count();
-
-    HoodieSnapshotExporter hoodieSnapshotExporter = new 
HoodieSnapshotExporter();
-    hoodieSnapshotExporter.export(spark, cfg);
-
-    long targetCount = spark.read().json(outputPath).count();
-
-    assertTrue(sourceCount == targetCount);
-
-    // Test Invalid OutputFormat
-    cfg.outputFormat = "foo";
-    int isError = hoodieSnapshotExporter.export(spark, cfg);
-    assertTrue(isError == -1);
-  }
+  public static class TestHoodieSnapshotExporterForHudi extends 
ExporterTestHarness {
+
+    @Test
+    public void testExportAsHudi() throws IOException {
+      HoodieSnapshotExporter.Config cfg = new Config();
+      cfg.sourceBasePath = sourcePath;
+      cfg.targetOutputPath = targetPath;
+      cfg.outputFormat = "hudi";
+      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+
+      // Check results
+      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".clean")));
+      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".clean.inflight")));
+      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".clean.requested")));
+      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".commit")));
+      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".commit.requested")));
+      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".inflight")));
+      assertTrue(dfs.exists(new Path(targetPath + 
"/.hoodie/hoodie.properties")));
+      String partition = targetPath + "/" + PARTITION_PATH;
+      long numParquetFiles = Arrays.stream(dfs.listStatus(new Path(partition)))
+          .filter(fileStatus -> 
fileStatus.getPath().toString().endsWith(".parquet"))
+          .count();
+      assertTrue("There should exist at least 1 parquet file.", 
numParquetFiles >= 1);
+      assertEquals(NUM_RECORDS, sqlContext.read().parquet(partition).count());
+      assertTrue(dfs.exists(new Path(partition + 
"/.hoodie_partition_metadata")));
+      assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+    }
 
-  // for testEmptySnapshotCopy
-  public void init() throws IOException {
-    TemporaryFolder folder = new TemporaryFolder();
-    folder.create();
-    rootPath = "file://" + folder.getRoot().getAbsolutePath();
-    basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
-    outputPath = rootPath + "/output";
-
-    final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
-    fs = FSUtils.getFs(basePath, hadoopConf);
-    HoodieTestUtils.init(hadoopConf, basePath);
+    @Test
+    public void testExportEmptyDataset() throws IOException {
+      // delete all source data
+      dfs.delete(new Path(sourcePath + "/" + PARTITION_PATH), true);
+
+      // export
+      HoodieSnapshotExporter.Config cfg = new Config();
+      cfg.sourceBasePath = sourcePath;
+      cfg.targetOutputPath = targetPath;
+      cfg.outputFormat = "hudi";
+      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+
+      // Check results
+      assertEquals("Target path should be empty.", 0, dfs.listStatus(new 
Path(targetPath)).length);
+      assertFalse(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+    }
   }
 
-  @Test
-  public void testEmptySnapshotCopy() throws IOException {
-    init();
-    // There is no real data (only .hoodie directory)
-    assertEquals(fs.listStatus(new Path(basePath)).length, 1);
-    assertFalse(fs.exists(new Path(outputPath)));
-
-    // Do the snapshot
-    HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc, basePath, outputPath, true);
+  @RunWith(Parameterized.class)
+  public static class TestHoodieSnapshotExporterForNonHudi extends 
ExporterTestHarness {
 
-    // Nothing changed; we just bail out
-    assertEquals(fs.listStatus(new Path(basePath)).length, 1);
-    assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
-  }
+    @Parameters
+    public static Iterable<String[]> formats() {
+      return Arrays.asList(new String[][] {{"json"}, {"parquet"}});
+    }
 
-  // TODO - uncomment this after fixing test failures
-  // @Test
-  public void testSnapshotCopy() throws Exception {
-    // Generate some commits and corresponding parquets
-    String commitTime1 = "20160501010101";
-    String commitTime2 = "20160502020601";
-    String commitTime3 = "20160506030611";
-    new File(basePath + "/.hoodie").mkdirs();
-    new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
-    // Only first two have commit files
-    new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
-    new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
-    new File(basePath + "/.hoodie/" + commitTime3 + 
".inflight").createNewFile();
-
-    // Some parquet files
-    new File(basePath + "/2016/05/01/").mkdirs();
-    new File(basePath + "/2016/05/02/").mkdirs();
-    new File(basePath + "/2016/05/06/").mkdirs();
-    HoodieTestDataGenerator.writePartitionMetadata(fs, new 
String[]{"2016/05/01", "2016/05/02", "2016/05/06"},
-        basePath);
-    // Make commit1
-    File file11 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
-    file11.createNewFile();
-    File file12 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
-    file12.createNewFile();
-    File file13 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
-    file13.createNewFile();
-
-    // Make commit2
-    File file21 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
-    file21.createNewFile();
-    File file22 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
-    file22.createNewFile();
-    File file23 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
-    file23.createNewFile();
-
-    // Make commit3
-    File file31 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
-    file31.createNewFile();
-    File file32 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
-    file32.createNewFile();
-    File file33 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
-    file33.createNewFile();
-
-    // Do a snapshot copy
-    HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
-    copier.snapshot(jsc, basePath, outputPath, false);
-
-    // Check results
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file11.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file12.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file13.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file21.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file22.getName())));
-    assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file23.getName())));
-    assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file31.getName())));
-    assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file32.getName())));
-    assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file33.getName())));
-
-    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + 
".commit")));
-    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + 
".commit")));
-    assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + 
".commit")));
-    assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + 
".inflight")));
-    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
-
-    assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
+    @Parameter
+    public String format;
+
+    @Test
+    public void testExportAsNonHudi() throws IOException {
+      HoodieSnapshotExporter.Config cfg = new Config();
+      cfg.sourceBasePath = sourcePath;
+      cfg.targetOutputPath = targetPath;
+      cfg.outputFormat = format;
+      new 
HoodieSnapshotExporter().export(SparkSession.builder().config(jsc.getConf()).getOrCreate(),
 cfg);
+      assertEquals(NUM_RECORDS, 
sqlContext.read().format(format).load(targetPath).count());
+      assertTrue(dfs.exists(new Path(targetPath + "/_SUCCESS")));
+    }
   }
 }

Reply via email to