This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 44700d5 [HUDI-344] Hudi Dataset Snapshot Exporter (#1360)
44700d5 is described below
commit 44700d531a74f24762903df2729577a0d96e4ec0
Author: openopen2 <[email protected]>
AuthorDate: Tue Mar 10 09:17:51 2020 +0800
[HUDI-344] Hudi Dataset Snapshot Exporter (#1360)
Co-authored-by: jason1993 <[email protected]>
---
.../hudi/utilities/HoodieSnapshotExporter.java | 224 +++++++++++++++++++++
.../apache/hudi/utilities/DataSourceTestUtils.java | 50 +++++
.../hudi/utilities/TestHoodieSnapshotExporter.java | 222 ++++++++++++++++++++
3 files changed, 496 insertions(+)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
new file mode 100644
index 0000000..f785d74
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.SerializableConfiguration;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.table.TableFileSystemView;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.DataSource;
+
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Export the latest records of Hudi dataset to a set of external files (e.g.,
plain parquet files).
+ *
+ * @experimental This export is an experimental tool. If you want to export
hudi to hudi, please use HoodieSnapshotCopier.
+ */
+public class HoodieSnapshotExporter {
+ private static final Logger LOG =
LogManager.getLogger(HoodieSnapshotExporter.class);
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--source-base-path"}, description = "Base path for
the source Hudi dataset to be snapshotted", required = true)
+ String sourceBasePath = null;
+
+ @Parameter(names = {"--target-base-path"}, description = "Base path for
the target output files (snapshots)", required = true)
+ String targetOutputPath = null;
+
+ @Parameter(names = {"--output-format"}, description = "e.g. Hudi or
Parquet", required = true)
+ String outputFormat;
+
+ @Parameter(names = {"--output-partition-field"}, description = "A field to
be used by Spark repartitioning")
+ String outputPartitionField;
+ }
+
+ public int export(SparkSession spark, Config cfg) throws IOException {
+ JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+ FileSystem fs = FSUtils.getFs(cfg.sourceBasePath,
jsc.hadoopConfiguration());
+
+ final SerializableConfiguration serConf = new
SerializableConfiguration(jsc.hadoopConfiguration());
+ final HoodieTableMetaClient tableMetadata = new
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
+ final TableFileSystemView.BaseFileOnlyView fsView = new
HoodieTableFileSystemView(tableMetadata,
+
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants());
+ // Get the latest commit
+ Option<HoodieInstant> latestCommit =
+
tableMetadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+ if (!latestCommit.isPresent()) {
+ LOG.error("No commits present. Nothing to snapshot");
+ return -1;
+ }
+ final String latestCommitTimestamp = latestCommit.get().getTimestamp();
+ LOG.info(String.format("Starting to snapshot latest version files which
are also no-late-than %s.",
+ latestCommitTimestamp));
+
+ List<String> partitions = FSUtils.getAllPartitionPaths(fs,
cfg.sourceBasePath, false);
+ if (partitions.size() > 0) {
+ List<String> dataFiles = new ArrayList<>();
+
+ for (String partition : partitions) {
+ dataFiles.addAll(fsView.getLatestBaseFilesBeforeOrOn(partition,
latestCommitTimestamp).map(f -> f.getPath()).collect(Collectors.toList()));
+ }
+
+ try {
+ DataSource.lookupDataSource(cfg.outputFormat,
spark.sessionState().conf());
+ } catch (Exception e) {
+ LOG.error(String.format("The %s output format is not supported! ",
cfg.outputFormat));
+ return -1;
+ }
+ if (!cfg.outputFormat.equalsIgnoreCase("hudi")) {
+ // Do transformation
+ // A field to do simple Spark repartitioning
+ DataFrameWriter<Row> write = null;
+ Dataset<Row> original =
spark.read().parquet(JavaConversions.asScalaIterator(dataFiles.iterator()).toSeq());
+ List<Column> needColumns =
Arrays.asList(original.columns()).stream().filter(col ->
!col.startsWith("_hoodie_")).map(col -> new
Column(col)).collect(Collectors.toList());
+ Dataset<Row> reader =
original.select(JavaConversions.asScalaIterator(needColumns.iterator()).toSeq());
+ if (!StringUtils.isNullOrEmpty(cfg.outputPartitionField)) {
+ write = reader.repartition(new Column(cfg.outputPartitionField))
+ .write().partitionBy(cfg.outputPartitionField);
+ } else {
+ write = reader.write();
+ }
+ write.format(cfg.outputFormat)
+ .mode(SaveMode.Overwrite)
+ .save(cfg.targetOutputPath);
+ } else {
+ // No transformation is needed for output format "HUDI", just copy the
original files.
+ copySnapshot(jsc, fs, cfg, partitions, dataFiles,
latestCommitTimestamp, serConf);
+ }
+ } else {
+ LOG.info("The job has 0 partition to copy.");
+ }
+ return 0;
+ }
+
+ private void copySnapshot(JavaSparkContext jsc,
+ FileSystem fs,
+ Config cfg,
+ List<String> partitions,
+ List<String> dataFiles,
+ String latestCommitTimestamp,
+ SerializableConfiguration serConf) throws
IOException {
+ // Make sure the output directory is empty
+ Path outputPath = new Path(cfg.targetOutputPath);
+ if (fs.exists(outputPath)) {
+ LOG.warn(String.format("The output path %s targetBasePath already
exists, deleting", outputPath));
+ fs.delete(new Path(cfg.targetOutputPath), true);
+ }
+
+ jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
+ // Only take latest version files <= latestCommit.
+ FileSystem fs1 = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+ List<Tuple2<String, String>> filePaths = new ArrayList<>();
+ dataFiles.forEach(hoodieDataFile -> filePaths.add(new
Tuple2<>(partition, hoodieDataFile)));
+
+ // also need to copy over partition metadata
+ Path partitionMetaFile =
+ new Path(new Path(cfg.sourceBasePath, partition),
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE);
+ if (fs1.exists(partitionMetaFile)) {
+ filePaths.add(new Tuple2<>(partition, partitionMetaFile.toString()));
+ }
+
+ return filePaths.iterator();
+ }).foreach(tuple -> {
+ String partition = tuple._1();
+ Path sourceFilePath = new Path(tuple._2());
+ Path toPartitionPath = new Path(cfg.targetOutputPath, partition);
+ FileSystem ifs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
+
+ if (!ifs.exists(toPartitionPath)) {
+ ifs.mkdirs(toPartitionPath);
+ }
+ FileUtil.copy(ifs, sourceFilePath, ifs, new Path(toPartitionPath,
sourceFilePath.getName()), false,
+ ifs.getConf());
+ });
+
+ // Also copy the .commit files
+ LOG.info(String.format("Copying .commit files which are no-late-than %s.",
latestCommitTimestamp));
+ FileStatus[] commitFilesToCopy =
+ fs.listStatus(new Path(cfg.sourceBasePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
+ if
(commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
+ return true;
+ } else {
+ String commitTime =
FSUtils.getCommitFromCommitFile(commitFilePath.getName());
+ return HoodieTimeline.compareTimestamps(commitTime,
latestCommitTimestamp,
+ HoodieTimeline.LESSER_OR_EQUAL);
+ }
+ });
+ for (FileStatus commitStatus : commitFilesToCopy) {
+ Path targetFilePath =
+ new Path(cfg.targetOutputPath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
+ if (!fs.exists(targetFilePath.getParent())) {
+ fs.mkdirs(targetFilePath.getParent());
+ }
+ if (fs.exists(targetFilePath)) {
+ LOG.error(
+ String.format("The target output commit file (%s targetBasePath)
already exists.", targetFilePath));
+ }
+ FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false,
fs.getConf());
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ // Take input configs
+ final Config cfg = new Config();
+ new JCommander(cfg, null, args);
+
+ // Create a spark job to do the snapshot export
+ SparkSession spark =
SparkSession.builder().appName("Hoodie-snapshot-exporter")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").getOrCreate();
+ LOG.info("Initializing spark job.");
+
+ HoodieSnapshotExporter hoodieSnapshotExporter = new
HoodieSnapshotExporter();
+ hoodieSnapshotExporter.export(spark, cfg);
+
+ // Stop the job
+ spark.stop();
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
new file mode 100644
index 0000000..1a96b81
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/DataSourceTestUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Test utils for data source tests.
+ */
+public class DataSourceTestUtils {
+
+ public static Option<String> convertToString(HoodieRecord record) {
+ try {
+ String str = ((TestRawTripPayload) record.getData()).getJsonData();
+ str = "{" + str.substring(str.indexOf("\"timestamp\":"));
+ // Remove the last } bracket
+ str = str.substring(0, str.length() - 1);
+ return Option.of(str + ", \"partition\": \"" + record.getPartitionPath()
+ "\"}");
+ } catch (IOException e) {
+ return Option.empty();
+ }
+ }
+
+ public static List<String> convertToStringList(List<HoodieRecord> records) {
+ return
records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get)
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
new file mode 100644
index 0000000..920f1ed
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.HoodieCommonTestHarness;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.util.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestHoodieSnapshotExporter extends HoodieCommonTestHarness {
+ private static String TEST_WRITE_TOKEN = "1-0-1";
+
+ private SparkSession spark = null;
+ private HoodieTestDataGenerator dataGen = null;
+ private String outputPath = null;
+ private String rootPath = null;
+ private FileSystem fs = null;
+ private Map commonOpts;
+ private HoodieSnapshotExporter.Config cfg;
+ private JavaSparkContext jsc = null;
+
+ @Before
+ public void initialize() throws IOException {
+ spark = SparkSession.builder()
+ .appName("Hoodie Datasource test")
+ .master("local[2]")
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .getOrCreate();
+ jsc = new JavaSparkContext(spark.sparkContext());
+ dataGen = new HoodieTestDataGenerator();
+ folder.create();
+ basePath = folder.getRoot().getAbsolutePath();
+ fs = FSUtils.getFs(basePath, spark.sparkContext().hadoopConfiguration());
+ commonOpts = new HashMap();
+
+ commonOpts.put("hoodie.insert.shuffle.parallelism", "4");
+ commonOpts.put("hoodie.upsert.shuffle.parallelism", "4");
+ commonOpts.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"_row_key");
+ commonOpts.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),
"partition");
+ commonOpts.put(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(),
"timestamp");
+ commonOpts.put(HoodieWriteConfig.TABLE_NAME, "hoodie_test");
+
+
+ cfg = new HoodieSnapshotExporter.Config();
+
+ cfg.sourceBasePath = basePath;
+ cfg.targetOutputPath = outputPath = basePath + "/target";
+ cfg.outputFormat = "json";
+ cfg.outputPartitionField = "partition";
+
+ }
+
+ @After
+ public void cleanup() {
+ if (spark != null) {
+ spark.stop();
+ }
+ }
+
+ @Test
+ public void testSnapshotExporter() throws IOException {
+ // Insert Operation
+ List<String> records =
DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100));
+ Dataset<Row> inputDF = spark.read().json(new
JavaSparkContext(spark.sparkContext()).parallelize(records, 2));
+ inputDF.write().format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY(),
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
+ .mode(SaveMode.Overwrite)
+ .save(basePath);
+ long sourceCount = inputDF.count();
+
+ HoodieSnapshotExporter hoodieSnapshotExporter = new
HoodieSnapshotExporter();
+ hoodieSnapshotExporter.export(spark, cfg);
+
+ long targetCount = spark.read().json(outputPath).count();
+
+ assertTrue(sourceCount == targetCount);
+
+ // Test Invalid OutputFormat
+ cfg.outputFormat = "foo";
+ int isError = hoodieSnapshotExporter.export(spark, cfg);
+ assertTrue(isError == -1);
+ }
+
+ // for testEmptySnapshotCopy
+ public void init() throws IOException {
+ TemporaryFolder folder = new TemporaryFolder();
+ folder.create();
+ rootPath = "file://" + folder.getRoot().getAbsolutePath();
+ basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
+ outputPath = rootPath + "/output";
+
+ final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+ fs = FSUtils.getFs(basePath, hadoopConf);
+ HoodieTestUtils.init(hadoopConf, basePath);
+ }
+
+ @Test
+ public void testEmptySnapshotCopy() throws IOException {
+ init();
+ // There is no real data (only .hoodie directory)
+ assertEquals(fs.listStatus(new Path(basePath)).length, 1);
+ assertFalse(fs.exists(new Path(outputPath)));
+
+ // Do the snapshot
+ HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
+ copier.snapshot(jsc, basePath, outputPath, true);
+
+ // Nothing changed; we just bail out
+ assertEquals(fs.listStatus(new Path(basePath)).length, 1);
+ assertFalse(fs.exists(new Path(outputPath + "/_SUCCESS")));
+ }
+
+ // TODO - uncomment this after fixing test failures
+ // @Test
+ public void testSnapshotCopy() throws Exception {
+ // Generate some commits and corresponding parquets
+ String commitTime1 = "20160501010101";
+ String commitTime2 = "20160502020601";
+ String commitTime3 = "20160506030611";
+ new File(basePath + "/.hoodie").mkdirs();
+ new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
+ // Only first two have commit files
+ new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
+ new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
+ new File(basePath + "/.hoodie/" + commitTime3 +
".inflight").createNewFile();
+
+ // Some parquet files
+ new File(basePath + "/2016/05/01/").mkdirs();
+ new File(basePath + "/2016/05/02/").mkdirs();
+ new File(basePath + "/2016/05/06/").mkdirs();
+ HoodieTestDataGenerator.writePartitionMetadata(fs, new
String[]{"2016/05/01", "2016/05/02", "2016/05/06"},
+ basePath);
+ // Make commit1
+ File file11 = new File(basePath + "/2016/05/01/" +
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
+ file11.createNewFile();
+ File file12 = new File(basePath + "/2016/05/02/" +
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
+ file12.createNewFile();
+ File file13 = new File(basePath + "/2016/05/06/" +
FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
+ file13.createNewFile();
+
+ // Make commit2
+ File file21 = new File(basePath + "/2016/05/01/" +
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
+ file21.createNewFile();
+ File file22 = new File(basePath + "/2016/05/02/" +
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
+ file22.createNewFile();
+ File file23 = new File(basePath + "/2016/05/06/" +
FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
+ file23.createNewFile();
+
+ // Make commit3
+ File file31 = new File(basePath + "/2016/05/01/" +
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
+ file31.createNewFile();
+ File file32 = new File(basePath + "/2016/05/02/" +
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
+ file32.createNewFile();
+ File file33 = new File(basePath + "/2016/05/06/" +
FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
+ file33.createNewFile();
+
+ // Do a snapshot copy
+ HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
+ copier.snapshot(jsc, basePath, outputPath, false);
+
+ // Check results
+ assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" +
file11.getName())));
+ assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" +
file12.getName())));
+ assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" +
file13.getName())));
+ assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" +
file21.getName())));
+ assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" +
file22.getName())));
+ assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" +
file23.getName())));
+ assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" +
file31.getName())));
+ assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" +
file32.getName())));
+ assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" +
file33.getName())));
+
+ assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 +
".commit")));
+ assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 +
".commit")));
+ assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 +
".commit")));
+ assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 +
".inflight")));
+ assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
+
+ assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
+ }
+}