yanghua commented on a change in pull request #1774:
URL: https://github.com/apache/hudi/pull/1774#discussion_r457278729
##########
File path:
hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java
##########
@@ -74,9 +74,9 @@ public String validateSync(
}
String targetLatestCommit =
- targetTimeline.getInstants().iterator().hasNext() ? "0" :
targetTimeline.lastInstant().get().getTimestamp();
+ targetTimeline.getInstants().iterator().hasNext() ?
targetTimeline.lastInstant().get().getTimestamp() : "0";
Review comment:
Good catch!
##########
File path:
hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class to run cmd and generate data in hive.
+ */
+public class HoodieTestHiveBase extends ITTestBase {
+
+ protected enum PartitionType {
+ SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
+ }
+
+ /**
+ * A basic integration test that runs HoodieJavaApp to create a sample
Hoodie data-set and performs upserts on it.
+ * Hive integration and upsert functionality is checked by running a count
query in hive console. TODO: Add
+ * spark-shell test-case
+ */
+ public void generateDataByHoodieJavaApp(String hiveTableName, String
tableType, PartitionType partitionType,
+ String commitType, String hoodieTableName) throws Exception {
+
+ String hdfsPath = getHdfsPath(hiveTableName);
+ String hdfsUrl = "hdfs://namenode" + hdfsPath;
+
+ Pair<String, String> stdOutErr;
+ if ("overwrite".equals(commitType)) {
+ // Drop Table if it exists
+ try {
+ dropHiveTables(hiveTableName, tableType);
+ } catch (AssertionError ex) {
+ // In travis, sometimes, the hivemetastore is not ready even though we
wait for the port to be up
+ // Workaround to sleep for 5 secs and retry
+ // Set sleep time by hoodie.hiveserver.time.wait
+ Thread.sleep(getTimeWait());
+ dropHiveTables(hiveTableName, tableType);
+ }
+
+ // Ensure table does not exist
+ stdOutErr = executeHiveCommand("show tables like '" + hiveTableName +
"'");
+ assertTrue(stdOutErr.getLeft().isEmpty(), "Dropped table " +
hiveTableName + " exists!");
+ }
+
+ // Run Hoodie Java App
+ String cmd = String.format("%s %s --hive-sync --table-path %s --hive-url
%s --table-type %s --hive-table %s" +
+ " --commit-type %s --table-name %s", HOODIE_JAVA_APP,
"HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL,
+ tableType, hiveTableName, commitType, hoodieTableName);
+ if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
+ cmd = cmd + " --use-multi-partition-keys";
+ } else if (partitionType == PartitionType.NON_PARTITIONED){
+ cmd = cmd + " --non-partitioned";
+ }
+ executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
+
+ String snapshotTableName = getSnapshotTableName(tableType, hiveTableName);
+
+ // Ensure table does exist
+ stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName +
"'");
+ assertEquals(snapshotTableName, stdOutErr.getLeft(), "Table exists");
+ }
+
+ protected void dropHiveTables(String hiveTableName, String tableType) throws
Exception {
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
+ executeHiveCommand("drop table if exists " + hiveTableName + "_ro");
+ } else {
+ executeHiveCommand("drop table if exists " + hiveTableName);
+ }
+ }
+
+ protected String getHdfsPath(String hiveTableName) {
Review comment:
`getHDFSPath` looks better?
##########
File path:
hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class to run cmd and generate data in hive.
+ */
+public class HoodieTestHiveBase extends ITTestBase {
+
+ protected enum PartitionType {
+ SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
+ }
+
+ /**
+ * A basic integration test that runs HoodieJavaApp to create a sample
Hoodie data-set and performs upserts on it.
+ * Hive integration and upsert functionality is checked by running a count
query in hive console. TODO: Add
+ * spark-shell test-case
+ */
+ public void generateDataByHoodieJavaApp(String hiveTableName, String
tableType, PartitionType partitionType,
+ String commitType, String hoodieTableName) throws Exception {
+
+ String hdfsPath = getHdfsPath(hiveTableName);
+ String hdfsUrl = "hdfs://namenode" + hdfsPath;
+
+ Pair<String, String> stdOutErr;
+ if ("overwrite".equals(commitType)) {
+ // Drop Table if it exists
+ try {
+ dropHiveTables(hiveTableName, tableType);
+ } catch (AssertionError ex) {
+ // In travis, sometimes, the hivemetastore is not ready even though we
wait for the port to be up
+ // Workaround to sleep for 5 secs and retry
+ // Set sleep time by hoodie.hiveserver.time.wait
+ Thread.sleep(getTimeWait());
+ dropHiveTables(hiveTableName, tableType);
+ }
+
+ // Ensure table does not exist
+ stdOutErr = executeHiveCommand("show tables like '" + hiveTableName +
"'");
+ assertTrue(stdOutErr.getLeft().isEmpty(), "Dropped table " +
hiveTableName + " exists!");
Review comment:
I would not suggest using assertation in normal methods. We can throw an
exception directly. WDYT?
##########
File path: hudi-spark/src/test/java/HoodieJavaGenerateApp.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieDataSourceHelpers;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+import org.apache.hudi.hive.NonPartitionedExtractor;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.testutils.DataSourceTestUtils;
+import org.apache.hudi.testutils.HoodieTestDataGenerator;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HoodieJavaGenerateApp {
+ @Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie
sample table")
+ private String tablePath = "file:///tmp/hoodie/sample-table";
+
+ @Parameter(names = {"--table-name", "-n"}, description = "table name for
Hoodie sample table")
+ private String tableName = "hoodie_test";
+
+ @Parameter(names = {"--table-type", "-t"}, description = "One of
COPY_ON_WRITE or MERGE_ON_READ")
+ private String tableType = HoodieTableType.COPY_ON_WRITE.name();
+
+ @Parameter(names = {"--hive-sync", "-hv"}, description = "Enable syncing to
hive")
+ private Boolean enableHiveSync = false;
+
+ @Parameter(names = {"--hive-db", "-hd"}, description = "hive database")
+ private String hiveDB = "default";
+
+ @Parameter(names = {"--hive-table", "-ht"}, description = "hive table")
+ private String hiveTable = "hoodie_sample_test";
+
+ @Parameter(names = {"--hive-user", "-hu"}, description = "hive username")
+ private String hiveUser = "hive";
+
+ @Parameter(names = {"--hive-password", "-hp"}, description = "hive password")
+ private String hivePass = "hive";
+
+ @Parameter(names = {"--hive-url", "-hl"}, description = "hive JDBC URL")
+ private String hiveJdbcUrl = "jdbc:hive2://localhost:10000";
+
+ @Parameter(names = {"--non-partitioned", "-np"}, description = "Use
non-partitioned Table")
+ private Boolean nonPartitionedTable = false;
+
+ @Parameter(names = {"--use-multi-partition-keys", "-mp"}, description = "Use
Multiple Partition Keys")
+ private Boolean useMultiPartitionKeys = false;
+
+ @Parameter(names = {"--commit-type", "-ct"}, description = "How may commits
will run")
+ private String commitType = "overwrite";
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ private static final Logger LOG = LogManager.getLogger(HoodieJavaApp.class);
Review comment:
`HoodieJavaGenerateApp`?
##########
File path:
hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class to run cmd and generate data in hive.
+ */
+public class HoodieTestHiveBase extends ITTestBase {
+
+ protected enum PartitionType {
+ SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
+ }
+
+ /**
+ * A basic integration test that runs HoodieJavaApp to create a sample
Hoodie data-set and performs upserts on it.
+ * Hive integration and upsert functionality is checked by running a count
query in hive console. TODO: Add
+ * spark-shell test-case
+ */
+ public void generateDataByHoodieJavaApp(String hiveTableName, String
tableType, PartitionType partitionType,
+ String commitType, String hoodieTableName) throws Exception {
+
+ String hdfsPath = getHdfsPath(hiveTableName);
+ String hdfsUrl = "hdfs://namenode" + hdfsPath;
+
+ Pair<String, String> stdOutErr;
+ if ("overwrite".equals(commitType)) {
+ // Drop Table if it exists
+ try {
+ dropHiveTables(hiveTableName, tableType);
+ } catch (AssertionError ex) {
+ // In travis, sometimes, the hivemetastore is not ready even though we
wait for the port to be up
+ // Workaround to sleep for 5 secs and retry
+ // Set sleep time by hoodie.hiveserver.time.wait
+ Thread.sleep(getTimeWait());
+ dropHiveTables(hiveTableName, tableType);
+ }
+
+ // Ensure table does not exist
+ stdOutErr = executeHiveCommand("show tables like '" + hiveTableName +
"'");
+ assertTrue(stdOutErr.getLeft().isEmpty(), "Dropped table " +
hiveTableName + " exists!");
+ }
+
+ // Run Hoodie Java App
+ String cmd = String.format("%s %s --hive-sync --table-path %s --hive-url
%s --table-type %s --hive-table %s" +
+ " --commit-type %s --table-name %s", HOODIE_JAVA_APP,
"HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL,
+ tableType, hiveTableName, commitType, hoodieTableName);
+ if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
+ cmd = cmd + " --use-multi-partition-keys";
+ } else if (partitionType == PartitionType.NON_PARTITIONED){
+ cmd = cmd + " --non-partitioned";
+ }
+ executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
+
+ String snapshotTableName = getSnapshotTableName(tableType, hiveTableName);
+
+ // Ensure table does exist
+ stdOutErr = executeHiveCommand("show tables like '" + snapshotTableName +
"'");
+ assertEquals(snapshotTableName, stdOutErr.getLeft(), "Table exists");
+ }
+
+ protected void dropHiveTables(String hiveTableName, String tableType) throws
Exception {
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+ executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
+ executeHiveCommand("drop table if exists " + hiveTableName + "_ro");
+ } else {
+ executeHiveCommand("drop table if exists " + hiveTableName);
+ }
+ }
+
+ protected String getHdfsPath(String hiveTableName) {
+ return "/" + hiveTableName;
+ }
+
+ protected String getSnapshotTableName(String tableType, String
hiveTableName) {
+ return tableType.equals(HoodieTableType.MERGE_ON_READ.name())
+ ? hiveTableName + "_rt" : hiveTableName;
+ }
+
+ private int getTimeWait() {
+ int timeWait = 5000;
Review comment:
Can we define a default constant for this?
##########
File path:
hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Base class to run cmd and generate data in hive.
+ */
+public class HoodieTestHiveBase extends ITTestBase {
+
+ protected enum PartitionType {
+ SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
+ }
+
+ /**
+ * A basic integration test that runs HoodieJavaApp to create a sample
Hoodie data-set and performs upserts on it.
+ * Hive integration and upsert functionality is checked by running a count
query in hive console. TODO: Add
+ * spark-shell test-case
+ */
+ public void generateDataByHoodieJavaApp(String hiveTableName, String
tableType, PartitionType partitionType,
+ String commitType, String hoodieTableName) throws Exception {
+
+ String hdfsPath = getHdfsPath(hiveTableName);
+ String hdfsUrl = "hdfs://namenode" + hdfsPath;
+
+ Pair<String, String> stdOutErr;
+ if ("overwrite".equals(commitType)) {
Review comment:
Can we define a constant for multiple usages? IMO, hard code is not a
good practice.
##########
File path:
hudi-integ-test/src/test/java/org/apache/hudi/integ/command/ITTestHoodieSyncCommand.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.command;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieTableType;
+
+import org.apache.hudi.integ.HoodieTestHiveBase;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for HoodieSyncCommand in hudi-cli module.
+ */
+public class ITTestHoodieSyncCommand extends HoodieTestHiveBase {
+
+ private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT +
"/hudi-cli/hudi-cli.sh";
+ private static final String SYNC_VALIDATE_COMMANDS = HOODIE_WS_ROOT +
"/docker/demo/sync-validate.commands";
+
+ @Test
+ public void testValidateSync() throws Exception {
+ String hiveTableName = "docker_hoodie_sync_valid_test";
+ String hiveTableName2 = "docker_hoodie_sync_valid_test_2";
+
+ generateDataByHoodieJavaApp(
+ hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
PartitionType.SINGLE_KEY_PARTITIONED, "overwrite", hiveTableName);
+
+ syncHoodieTable(hiveTableName2, "INSERT");
+
+ generateDataByHoodieJavaApp(
+ hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
PartitionType.SINGLE_KEY_PARTITIONED, "append", hiveTableName);
+
+ TestExecStartResultCallback result =
+ executeCommandStringInDocker(ADHOC_1_CONTAINER, HUDI_CLI_TOOL + "
--cmdfile " + SYNC_VALIDATE_COMMANDS, true);
+
+ String expected = String.format("Count difference now is (count(%s) -
count(%s) == %d. Catch up count is %d",
+ hiveTableName, hiveTableName2, 100, 200);
+ assertTrue(result.getStderr().toString().contains(expected));
+
+ dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
+ dropHiveTables(hiveTableName2, HoodieTableType.COPY_ON_WRITE.name());
+ }
+
+ private void syncHoodieTable(String hiveTableName, String op) throws
Exception {
+ String cmd = "spark-submit --packages
org.apache.spark:spark-avro_2.11:2.4.4 "
Review comment:
Using `StringBuilder ` to build `cmd` is a better choice?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]