This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 44700d5  [HUDI-344] Hudi Dataset Snapshot Exporter (#1360)
44700d5 is described below

commit 44700d531a74f24762903df2729577a0d96e4ec0
Author: openopen2 <a261049...@outlook.com>
AuthorDate: Tue Mar 10 09:17:51 2020 +0800

    [HUDI-344] Hudi Dataset Snapshot Exporter (#1360)
    
    Co-authored-by: jason1993 <261049...@qq.com>
---
 .../hudi/utilities/HoodieSnapshotExporter.java     | 224 +++++++++++++++++++++
 .../apache/hudi/utilities/DataSourceTestUtils.java |  50 +++++
 .../hudi/utilities/TestHoodieSnapshotExporter.java | 222 ++++++++++++++++++++
 3 files changed, 496 insertions(+)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
new file mode 100644
index 0000000..f785d74
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.table.TableFileSystemView;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.DataSource;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Export the latest records of Hudi dataset to a set of external files (e.g., 
plain parquet files).
+ *
+ * @experimental This export is an experimental tool. If you want to export 
hudi to hudi, please use HoodieSnapshotCopier.
+ */
+public class HoodieSnapshotExporter {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSnapshotExporter.class);
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--source-base-path"}, description = "Base path for 
the source Hudi dataset to be snapshotted", required = true)
+    String sourceBasePath = null;
+
+    @Parameter(names = {"--target-base-path"}, description = "Base path for 
the target output files (snapshots)", required = true)
+    String targetOutputPath = null;
+
+    @Parameter(names = {"--output-format"}, description = "e.g. Hudi or 
Parquet", required = true)
+    String outputFormat;
+
+    @Parameter(names = {"--output-partition-field"}, description = "A field to 
be used by Spark repartitioning")
+    String outputPartitionField;
+  }
+
+  public int export(SparkSession spark, Config cfg) throws IOException {
+    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+    FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, 
jsc.hadoopConfiguration());
+
+    final SerializableConfiguration serConf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
+    final HoodieTableMetaClient tableMetadata = new 
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
+    final TableFileSystemView.BaseFileOnlyView fsView = new 
HoodieTableFileSystemView(tableMetadata,
+        
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
+    // Get the latest commit
+    Option<HoodieInstant> latestCommit =
+        
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+    if (!latestCommit.isPresent()) {
+      LOG.error("No commits present. Nothing to snapshot");
+      return -1;
+    }
+    final String latestCommitTimestamp = latestCommit.get().getTimestamp();
+    LOG.info(String.format("Starting to snapshot latest version files which 
are also no-late-than %s.",
+        latestCommitTimestamp));
+
+    List<String> partitions = FSUtils.getAllPartitionPaths(fs, 
cfg.sourceBasePath, false);
+    if (partitions.size() > 0) {
+      List<String> dataFiles = new ArrayList<>();
+
+      for (String partition : partitions) {
+        dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition, 
latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
+      }
+
+      try {
+        DataSource.lookupDataSource(cfg.outputFormat, 
spark.sessionState().conf());
+      } catch (Exception e) {
+        LOG.error(String.format("The %s output format is not supported! ", 
cfg.outputFormat));
+        return -1;
+      }
+      if (!cfg.outputFormat.equalsIgnoreCase("hudi")) {
+        // Do transformation
+        // A field to do simple Spark repartitioning
+        DataFrameWriter<Row> write = null;
+        Dataset<Row> original = 
spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
+        List<Column> needColumns = 
Arrays.asList(original.columns()).stream().filter(col -> 
!col.startsWith("_hoodie_")).map(col -> new 
Column(col)).collect(Collectors.toList());
+        Dataset<Row> reader = 
original.select(JavaConversions.asScalaIterator(needColumns.iterator()).toSeq());
+        if (!StringUtils.isNullOrEmpty(cfg.outputPartitionField)) {
+          write = reader.repartition(new Column(cfg.outputPartitionField))
+              .write().partitionBy(cfg.outputPartitionField);
+        } else {
+          write = reader.write();
+        }
+        write.format(cfg.outputFormat)
+            .mode(SaveMode.Overwrite)
+            .save(cfg.targetOutputPath);
+      } else {
+        // No transformation is needed for output format "HUDI", just copy the 
original files.
+        copySnapshot(jsc, fs, cfg, partitions, dataFiles, 
latestCommitTimestamp, serConf);
+      }
+    } else {
+      LOG.info("The job has 0 partition to copy.");
+    }
+    return 0;
+  }
+
+  private void copySnapshot(JavaSparkContext jsc,
+                            FileSystem fs,
+                            Config cfg,
+                            List<String> partitions,
+                            List<String> dataFiles,
+                            String latestCommitTimestamp,
+                            SerializableConfiguration serConf) throws 
IOException {
+    // Make sure the output directory is empty
+    Path outputPath = new Path(cfg.targetOutputPath);
+    if (fs.exists(outputPath)) {
+      LOG.warn(String.format("The output path %s targetBasePath already 
exists, deleting", outputPath));
+      fs.delete(new Path(cfg.targetOutputPath), true);
+    }
+
+    jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
+      // Only take latest version files <= latestCommit.
+      FileSystem fs1 = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+      List<Tuple2<String, String>> filePaths = new ArrayList<>();
+      dataFiles.forEach(hoodieDataFile -> filePaths.add(new 
Tuple2<>(partition, hoodieDataFile)));
+
+      // also need to copy over partition metadata
+      Path partitionMetaFile =
+          new Path(new Path(cfg.sourceBasePath, partition), 
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
+      if (fs1.exists(partitionMetaFile)) {
+        filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
+      }
+
+      return filePaths.iterator();
+    }).foreach(tuple -> {
+      String partition = tuple._1();
+      Path sourceFilePath = new Path(tuple._2());
+      Path toPartitionPath = new Path(cfg.targetOutputPath, partition);
+      FileSystem ifs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
+
+      if (!ifs.exists(toPartitionPath)) {
+        ifs.mkdirs(toPartitionPath);
+      }
+      FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath, 
sourceFilePath.getName()), false,
+          ifs.getConf());
+    });
+
+    // Also copy the .commit files
+    LOG.info(String.format("Copying .commit files which are no-late-than %s.", 
latestCommitTimestamp));
+    FileStatus[] commitFilesToCopy =
+        fs.listStatus(new Path(cfg.sourceBasePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
+          if 
(commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
+            return true;
+          } else {
+            String commitTime = 
FSUtils.getCommitFromCommitFile(commitFilePath.getName());
+            return HoodieTimeline.compareTimestamps(commitTime, 
latestCommitTimestamp,
+                HoodieTimeline.LESSER_OR_EQUAL);
+          }
+        });
+    for (FileStatus commitStatus : commitFilesToCopy) {
+      Path targetFilePath =
+          new Path(cfg.targetOutputPath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
+      if (!fs.exists(targetFilePath.getParent())) {
+        fs.mkdirs(targetFilePath.getParent());
+      }
+      if (fs.exists(targetFilePath)) {
+        LOG.error(
+            String.format("The target output commit file (%s targetBasePath) 
already exists.", targetFilePath));
+      }
+      FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, 
fs.getConf());
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    // Take input configs
+    final Config cfg = new Config();
+    new JCommander(cfg, null, args);
+
+    // Create a spark job to do the snapshot export
+    SparkSession spark = 
SparkSession.builder().appName("Hoodie-snapshot-exporter")
+        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer").getOrCreate();
+    LOG.info("Initializing spark job.");
+
+    HoodieSnapshotExporter hoodieSnapshotExporter = new 
HoodieSnapshotExporter();
+    hoodieSnapshotExporter.export(spark, cfg);
+
+    // Stop the job
+    spark.stop();
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
new file mode 100644
index 0000000..1a96b81
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Test utils for data source tests.
+ */
+public class DataSourceTestUtils {
+
+  public static Option<String> convertToString(HoodieRecord record) {
+    try {
+      String str = ((TestRawTripPayload) record.getData()).getJsonData();
+      str = "{" + str.substring(str.indexOf("\"timestamp\":"));
+      // Remove the last } bracket
+      str = str.substring(0, str.length() - 1);
+      return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() 
+ "\"}");
+    } catch (IOException e) {
+      return Option.empty();
+    }
+  }
+
+  public static List<String> convertToStringList(List<HoodieRecord> records) {
+    return 
records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get)
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
new file mode 100644
index 0000000..920f1ed
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.HoodieCommonTestHarness;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestHoodieSnapshotExporter extends HoodieCommonTestHarness {
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+
+  private SparkSession spark = null;
+  private HoodieTestDataGenerator dataGen = null;
+  private String outputPath = null;
+  private String rootPath = null;
+  private FileSystem fs = null;
+  private Map commonOpts;
+  private HoodieSnapshotExporter.Config cfg;
+  private JavaSparkContext jsc = null;
+
+  @Before
+  public void initialize() throws IOException {
+    spark = SparkSession.builder()
+        .appName("Hoodie Datasource test")
+        .master("local[2]")
+        .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+    jsc = new JavaSparkContext(spark.sparkContext());
+    dataGen = new HoodieTestDataGenerator();
+    folder.create();
+    basePath = folder.getRoot().getAbsolutePath();
+    fs = FSUtils.getFs(basePath, spark.sparkContext().hadoopConfiguration());
+    commonOpts = new HashMap();
+
+    commonOpts.put("hoodie.insert.shuffle.parallelism", "4");
+    commonOpts.put("hoodie.upsert.shuffle.parallelism", "4");
+    commonOpts.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"_row_key");
+    commonOpts.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), 
"partition");
+    commonOpts.put(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), 
"timestamp");
+    commonOpts.put(HoodieWriteConfig.TABLE_NAME, "hoodie_test");
+
+
+    cfg = new HoodieSnapshotExporter.Config();
+
+    cfg.sourceBasePath = basePath;
+    cfg.targetOutputPath = outputPath = basePath + "/target";
+    cfg.outputFormat = "json";
+    cfg.outputPartitionField = "partition";
+
+  }
+
+  @After
+  public void cleanup() {
+    if (spark != null) {
+      spark.stop();
+    }
+  }
+
+  @Test
+  public void testSnapshotExporter() throws IOException {
+    // Insert Operation
+    List<String> records = 
DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100));
+    Dataset<Row> inputDF = spark.read().json(new 
JavaSparkContext(spark.sparkContext()).parallelize(records, 2));
+    inputDF.write().format("hudi")
+        .options(commonOpts)
+        .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
+        .mode(SaveMode.Overwrite)
+        .save(basePath);
+    long sourceCount = inputDF.count();
+
+    HoodieSnapshotExporter hoodieSnapshotExporter = new 
HoodieSnapshotExporter();
+    hoodieSnapshotExporter.export(spark, cfg);
+
+    long targetCount = spark.read().json(outputPath).count();
+
+    assertTrue(sourceCount == targetCount);
+
+    // Test Invalid OutputFormat
+    cfg.outputFormat = "foo";
+    int isError = hoodieSnapshotExporter.export(spark, cfg);
+    assertTrue(isError == -1);
+  }
+
+  // for testEmptySnapshotCopy
+  public void init() throws IOException {
+    TemporaryFolder folder = new TemporaryFolder();
+    folder.create();
+    rootPath = "file://" + folder.getRoot().getAbsolutePath();
+    basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
+    outputPath = rootPath + "/output";
+
+    final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+    fs = FSUtils.getFs(basePath, hadoopConf);
+    HoodieTestUtils.init(hadoopConf, basePath);
+  }
+
+  @Test
+  public void testEmptySnapshotCopy() throws IOException {
+    init();
+    // There is no real data (only .hoodie directory)
+    assertEquals(fs.listStatus(new Path(basePath)).length, 1);
+    assertFalse(fs.exists(new Path(outputPath)));
+
+    // Do the snapshot
+    HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
+    copier.snapshot(jsc, basePath, outputPath, true);
+
+    // Nothing changed; we just bail out
+    assertEquals(fs.listStatus(new Path(basePath)).length, 1);
+    assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
+  }
+
+  // TODO - uncomment this after fixing test failures
+  // @Test
+  public void testSnapshotCopy() throws Exception {
+    // Generate some commits and corresponding parquets
+    String commitTime1 = "20160501010101";
+    String commitTime2 = "20160502020601";
+    String commitTime3 = "20160506030611";
+    new File(basePath + "/.hoodie").mkdirs();
+    new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
+    // Only first two have commit files
+    new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime3 + 
".inflight").createNewFile();
+
+    // Some parquet files
+    new File(basePath + "/2016/05/01/").mkdirs();
+    new File(basePath + "/2016/05/02/").mkdirs();
+    new File(basePath + "/2016/05/06/").mkdirs();
+    HoodieTestDataGenerator.writePartitionMetadata(fs, new 
String[]{"2016/05/01", "2016/05/02", "2016/05/06"},
+        basePath);
+    // Make commit1
+    File file11 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
+    file11.createNewFile();
+    File file12 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
+    file12.createNewFile();
+    File file13 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
+    file13.createNewFile();
+
+    // Make commit2
+    File file21 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
+    file21.createNewFile();
+    File file22 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
+    file22.createNewFile();
+    File file23 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
+    file23.createNewFile();
+
+    // Make commit3
+    File file31 = new File(basePath + "/2016/05/01/" + 
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
+    file31.createNewFile();
+    File file32 = new File(basePath + "/2016/05/02/" + 
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
+    file32.createNewFile();
+    File file33 = new File(basePath + "/2016/05/06/" + 
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
+    file33.createNewFile();
+
+    // Do a snapshot copy
+    HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
+    copier.snapshot(jsc, basePath, outputPath, false);
+
+    // Check results
+    assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file11.getName())));
+    assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file12.getName())));
+    assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file13.getName())));
+    assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file21.getName())));
+    assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file22.getName())));
+    assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file23.getName())));
+    assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + 
file31.getName())));
+    assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + 
file32.getName())));
+    assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + 
file33.getName())));
+
+    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + 
".commit")));
+    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + 
".commit")));
+    assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + 
".commit")));
+    assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + 
".inflight")));
+    assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
+
+    assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
+  }
+}

Reply via email to