This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a6390ae [HUDI-312] Make docker hdfs cluster ephemeral. This is needed
to fix flakiness in integration tests. Also, Fix DeltaStreamer hanging issue
due to uncaught exception
a6390ae is described below
commit a6390aefc41482ba224a4524fe216c277e4f5e79
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Thu Oct 31 06:57:25 2019 -0700
[HUDI-312] Make docker hdfs cluster ephemeral. This is needed to fix
flakiness in integration tests. Also, Fix DeltaStreamer hanging issue due to
uncaught exception
---
docker/compose/docker-compose_hadoop284_hive233_spark231.yml | 4 ----
docker/hoodie/hadoop/base/Dockerfile | 2 +-
docker/setup_demo.sh | 6 ------
.../test/java/org/apache/hudi/integ/ITTestHoodieDemo.java | 4 ++--
.../org/apache/hudi/utilities/deltastreamer/DeltaSync.java | 5 ++++-
.../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | 12 +++++++++---
6 files changed, 16 insertions(+), 17 deletions(-)
diff --git a/docker/compose/docker-compose_hadoop284_hive233_spark231.yml
b/docker/compose/docker-compose_hadoop284_hive233_spark231.yml
index 05eb92f..8da8edb 100644
--- a/docker/compose/docker-compose_hadoop284_hive233_spark231.yml
+++ b/docker/compose/docker-compose_hadoop284_hive233_spark231.yml
@@ -21,8 +21,6 @@ services:
image: apachehudi/hudi-hadoop_2.8.4-namenode:latest
hostname: namenode
container_name: namenode
- volumes:
- - /tmp/hadoop_name:/hadoop/dfs/name
environment:
- CLUSTER_NAME=hudi_hadoop284_hive232_spark231
ports:
@@ -57,8 +55,6 @@ services:
retries: 3
depends_on:
- namenode
- volumes:
- - /tmp/hadoop_data:/hadoop/dfs/data
historyserver:
image: apachehudi/hudi-hadoop_2.8.4-history:latest
diff --git a/docker/hoodie/hadoop/base/Dockerfile
b/docker/hoodie/hadoop/base/Dockerfile
index 4e6b448..a522b0d 100644
--- a/docker/hoodie/hadoop/base/Dockerfile
+++ b/docker/hoodie/hadoop/base/Dockerfile
@@ -24,7 +24,7 @@ USER root
ENV LANG C.UTF-8
ARG HADOOP_VERSION=2.8.4
-ARG
HADOOP_URL=https://www.apache.org/dist/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
+ARG
HADOOP_URL=https://archive.apache.org/dist/hadoop/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
ENV HADOOP_VERSION ${HADOOP_VERSION}
ENV HADOOP_URL ${HADOOP_URL}
diff --git a/docker/setup_demo.sh b/docker/setup_demo.sh
index 83c6200..4f40724 100755
--- a/docker/setup_demo.sh
+++ b/docker/setup_demo.sh
@@ -18,16 +18,10 @@
# limitations under the License.
################################################################################
-# Create host mount directory and copy
-mkdir -p /tmp/hadoop_name
-mkdir -p /tmp/hadoop_data
-
WS_ROOT=`dirname $PWD`
# restart cluster
HUDI_WS=${WS_ROOT} docker-compose -f
compose/docker-compose_hadoop284_hive233_spark231.yml down
HUDI_WS=${WS_ROOT} docker-compose -f
compose/docker-compose_hadoop284_hive233_spark231.yml pull
-rm -rf /tmp/hadoop_data/*
-rm -rf /tmp/hadoop_name/*
sleep 5
HUDI_WS=${WS_ROOT} docker-compose -f
compose/docker-compose_hadoop284_hive233_spark231.yml up -d
sleep 15
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 7d079c5..e1159be 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -88,8 +88,8 @@ public class ITTestHoodieDemo extends ITTestBase {
}
private void setupDemo() throws Exception {
- List<String> cmds = new ImmutableList.Builder<String>().add("hdfs dfsadmin
-safemode wait") // handle NN going into
-
// safe mode at times
+ List<String> cmds = new ImmutableList.Builder<String>()
+ .add("hdfs dfsadmin -safemode wait") // handle NN going into safe mode
at times
.add("hdfs dfs -mkdir -p " + HDFS_DATA_DIR)
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " +
HDFS_BATCH_PATH1)
.add("/bin/bash " + DEMO_CONTAINER_SCRIPT).build();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 929196e..fb16268 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
@@ -251,7 +252,9 @@ public class DeltaSync implements Serializable {
} else {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this
table "
- + "was indeed built via delta streamer ");
+ + "was indeed built via delta streamer. Last Commit :" +
lastCommit + ", Instants :"
+ +
commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ",
CommitMetadata="
+ + commitMetadata.toJsonString());
}
}
} else {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 3a6baa5..4b90d90 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -117,9 +117,15 @@ public class HoodieDeltaStreamer implements Serializable {
log.info("Delta Sync shutting down");
} else {
log.info("Delta Streamer running only single round");
- deltaSyncService.getDeltaSync().syncOnce();
- deltaSyncService.close();
- log.info("Shut down deltastreamer");
+ try {
+ deltaSyncService.getDeltaSync().syncOnce();
+ } catch (Exception ex) {
+ log.error("Got error running delta sync once. Shutting down", ex);
+ throw ex;
+ } finally {
+ deltaSyncService.close();
+ log.info("Shut down deltastreamer");
+ }
}
}