This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 238f2e1 [GOBBLIN-928] Craftsmanship cleaning and bumping up ORC
version
238f2e1 is described below
commit 238f2e1cf8cc1be523cef47240c855cd596ea260
Author: autumnust <[email protected]>
AuthorDate: Sun Oct 27 23:07:07 2019 -0700
[GOBBLIN-928] Craftsmanship cleaning and bumping up ORC version
Closes #2784 from autumnust/kafkaStreamingRecipe
---
.../java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java | 3 ++-
.../main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java | 6 ++----
.../org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java | 2 +-
.../apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java | 7 ++++++-
.../java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java | 9 ++++++---
gradle/scripts/dependencyDefinitions.gradle | 2 +-
6 files changed, 18 insertions(+), 11 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index f3c63f9..80cbe58 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -381,7 +381,8 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
}
/**
- * Add a single {@link WorkUnit} (flattened).
+ * Add a single {@link WorkUnit} (flattened) to persistent storage so that
worker can fetch that based on information
+ * fetched in Helix task.
*/
private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner,
Map<String, TaskConfig> taskConfigMap) throws IOException {
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index aad3908..f3edbd1 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -87,10 +87,8 @@ import static
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
* {@link org.apache.gobblin.source.workunit.WorkUnit}s.
*
* <p>
- * This class presents a Helix participant and uses a {@link HelixManager}
to communicate with Helix.
- * It also uses Helix task execution framework and {@link
GobblinHelixTaskFactory} class to generate
- * {@link GobblinHelixTask}s which handles real Gobblin tasks. All the Helix
related task framework is
- * encapsulated in {@link TaskRunnerSuiteBase}.
+ * This class presents a Helix participant that uses a {@link HelixManager}
to communicate with Helix.
+ * It uses Helix task execution framework and details are encapsulated in
{@link TaskRunnerSuiteBase}.
* </p>
*
* <p>
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index 9bec51d..c798854 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -42,7 +42,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
/**
- * A sub-type of {@link TaskRunnerSuiteBase} suite which runs tasks in a
thread pool.
+ * A implementation of {@link TaskRunnerSuiteBase} suite which runs tasks in a
thread pool.
*/
class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
protected final GobblinHelixTaskFactory taskFactory;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index 9224b6b..651a3ce 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -50,7 +50,12 @@ public class StateStoreBasedWatermarkStorage implements
WatermarkStorage {
public static final String WATERMARK_STORAGE_TYPE_KEY
="streaming.watermarkStateStore.type";
public static final String WATERMARK_STORAGE_TYPE_DEFAULT ="zk";
public static final String
WATERMARK_STORAGE_CONFIG_PREFIX="streaming.watermarkStateStore.config.";
- private static final String WATERMARK_STORAGE_PREFIX="streamingWatermarks:";
+
+ /**
+ * A watermark prefix that is compatible with different watermark storage
implementations.
+ * As such, this prefix should not include any characters disallowed in a
{@link java.net.URI}.
+ */
+ private static final String WATERMARK_STORAGE_PREFIX="streamingWatermarks_";
public final StateStore<CheckpointableWatermarkState> _stateStore;
private final String _storeName;
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index facbdd4..673cc6c 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -347,9 +347,12 @@ public class GobblinYarnAppLauncher {
LOGGER.warn("NOT starting the admin UI because the job execution info
server is NOT enabled");
}
- this.serviceManager = Optional.of(new ServiceManager(services));
- // Start all the services running in the ApplicationMaster
- this.serviceManager.get().startAsync();
+ if (services.size() > 0 ) {
+ this.serviceManager = Optional.of(new ServiceManager(services));
+ this.serviceManager.get().startAsync();
+ } else {
+ serviceManager = Optional.absent();
+ }
}
/**
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index f669316..83376b5 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -167,7 +167,7 @@ ext.externalDependency = [
"opencsv": "com.opencsv:opencsv:3.8",
"grok": "io.thekraken:grok:0.1.5",
"hadoopAdl" : "org.apache.hadoop:hadoop-azure-datalake:3.0.0-alpha2",
- "orcMapreduce":"org.apache.orc:orc-mapreduce:1.5.4",
+ "orcMapreduce":"org.apache.orc:orc-mapreduce:1.6.0",
'parquet': 'com.twitter:parquet-hadoop-bundle:1.5.0',
'reactivex': 'io.reactivex.rxjava2:rxjava:2.1.0',
"slf4j": [