This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 37556f2 [BEAM-6994] SamzaRunner: further improvements for upgrading
Samza (#8221)
37556f2 is described below
commit 37556f2cd3ce32e23e16cb653632eafd7169c8e2
Author: xinyuiscool <[email protected]>
AuthorDate: Mon Apr 8 18:19:27 2019 -0700
[BEAM-6994] SamzaRunner: further improvements for upgrading Samza (#8221)
---
runners/samza/job-server/build.gradle | 49 ++++++++++++
.../samza/SamzaPipelineLifeCycleListener.java | 44 +++++++++++
.../beam/runners/samza/SamzaPipelineOptions.java | 9 +++
.../beam/runners/samza/SamzaPipelineResult.java | 12 ++-
.../org/apache/beam/runners/samza/SamzaRunner.java | 87 +++++++++++++++++++---
.../runners/samza/adapter/BoundedSourceSystem.java | 4 +-
.../samza/adapter/UnboundedSourceSystem.java | 5 +-
.../apache/beam/runners/samza/runtime/DoFnOp.java | 12 ++-
.../beam/runners/samza/runtime/GroupByKeyOp.java | 14 +++-
.../runners/samza/runtime/SamzaDoFnRunners.java | 3 +-
.../samza/runtime/SamzaStoreStateInternals.java | 22 +++---
.../samza/runtime/SamzaTimerInternalsFactory.java | 18 ++++-
.../runners/samza/translation/ConfigBuilder.java | 84 ++++++++++++---------
.../samza/translation/GroupByKeyTranslator.java | 33 +++++---
.../samza/translation/ImpulseTranslator.java | 7 +-
.../translation/ParDoBoundMultiTranslator.java | 9 +++
.../SamzaPortablePipelineTranslator.java | 6 +-
.../translation/TransformConfigGenerator.java | 4 +-
.../samza/util/SamzaPipelineTranslatorUtils.java | 17 +++++
.../runtime/SamzaTimerInternalsFactoryTest.java | 4 +-
settings.gradle | 2 +
21 files changed, 361 insertions(+), 84 deletions(-)
diff --git a/runners/samza/job-server/build.gradle
b/runners/samza/job-server/build.gradle
new file mode 100644
index 0000000..a8ab8b9
--- /dev/null
+++ b/runners/samza/job-server/build.gradle
@@ -0,0 +1,49 @@
+import org.apache.beam.gradle.BeamModulePlugin
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: 'org.apache.beam.module'
+apply plugin: 'application'
+// we need to set mainClassName before applying shadow plugin
+mainClassName = "org.apache.beam.runners.samza.SamzaJobServerDriver"
+
+repositories {
+ maven {
+ url "https://artifactory.corp.linkedin.com:8083/artifactory/DDS/"
+ }
+}
+
+applyJavaNature(
+ validateShadowJar: false,
+ exportJavadoc: false,
+ shadowClosure: {
+ append "reference.conf"
+ },
+)
+
+dependencies {
+ compile project(path: ":beam-runners-samza", configuration: "shadow")
+ compile group: "org.slf4j", name: "jcl-over-slf4j", version:
dependencies.create(project.library.java.slf4j_api).getVersion()
+ compile library.java.slf4j_simple
+ shadow library.java.guava
+}
+
+runShadow {
+ args = []
+}
\ No newline at end of file
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
new file mode 100644
index 0000000..9585d77
--- /dev/null
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
+
+/** Life cycle listener for a Samza pipeline during runtime. */
+public interface SamzaPipelineLifeCycleListener {
+ /** Callback when the pipeline options is created. */
+ void onInit(Config config);
+
+ /** Callback when the pipeline is started. */
+ ExternalContext onStart();
+
+ /**
+ * Callback after the pipeline is submmitted. This will be invoked only for
Samza jobs submitted
+ * to a cluster.
+ */
+ void onSubmit();
+
+ /** Callback after the pipeline is finished. */
+ void onFinish();
+
+ /** A registrar for {@link SamzaPipelineLifeCycleListener}. */
+ interface Registrar {
+ SamzaPipelineLifeCycleListener getLifeCycleListener();
+ }
+}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index 10c5351..5ef07cf 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -17,12 +17,15 @@
*/
package org.apache.beam.runners.samza;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.config.factories.PropertiesConfigFactory;
+import org.apache.samza.metrics.MetricsReporter;
/** Options which can be used to configure a Samza PortablePipelineRunner. */
public interface SamzaPipelineOptions extends PipelineOptions {
@@ -102,4 +105,10 @@ public interface SamzaPipelineOptions extends
PipelineOptions {
int getTimerBufferSize();
void setTimerBufferSize(int timerBufferSize);
+
+ @JsonIgnore
+ @Description("The metrics reporters that will be used to emit metrics.")
+ List<MetricsReporter> getMetricsReporters();
+
+ void setMetricsReporters(List<MetricsReporter> reporters);
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
index db71dc9..063f0b4 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
@@ -38,12 +38,17 @@ public class SamzaPipelineResult implements PipelineResult {
private final SamzaExecutionContext executionContext;
private final ApplicationRunner runner;
private final StreamApplication app;
+ private final SamzaPipelineLifeCycleListener listener;
public SamzaPipelineResult(
- StreamApplication app, ApplicationRunner runner, SamzaExecutionContext
executionContext) {
+ StreamApplication app,
+ ApplicationRunner runner,
+ SamzaExecutionContext executionContext,
+ SamzaPipelineLifeCycleListener listener) {
this.executionContext = executionContext;
this.runner = runner;
this.app = app;
+ this.listener = listener;
}
@Override
@@ -70,6 +75,11 @@ public class SamzaPipelineResult implements PipelineResult {
}
final StateInfo stateInfo = getStateInfo();
+
+ if (listener != null && (stateInfo.state == State.DONE || stateInfo.state
== State.FAILED)) {
+ listener.onFinish();
+ }
+
if (stateInfo.state == State.FAILED) {
throw stateInfo.error;
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index 3589edb..36d47a8 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -17,7 +17,11 @@
*/
package org.apache.beam.runners.samza;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.ServiceLoader;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.samza.translation.ConfigBuilder;
import org.apache.beam.runners.samza.translation.PViewToIdMapper;
@@ -32,7 +36,12 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsReporterFactory;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.slf4j.Logger;
@@ -51,14 +60,27 @@ public class SamzaRunner extends
PipelineRunner<SamzaPipelineResult> {
}
private final SamzaPipelineOptions options;
+ private final SamzaPipelineLifeCycleListener listener;
private SamzaRunner(SamzaPipelineOptions options) {
this.options = options;
+ final Iterator<SamzaPipelineLifeCycleListener.Registrar> listenerReg =
+
ServiceLoader.load(SamzaPipelineLifeCycleListener.Registrar.class).iterator();
+ this.listener =
+ listenerReg.hasNext() ?
Iterators.getOnlyElement(listenerReg).getLifeCycleListener() : null;
}
- SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
- ConfigBuilder configBuilder = new ConfigBuilder(options);
- SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder);
+ public SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
+ final ConfigBuilder configBuilder = new ConfigBuilder(options);
+ SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder,
options);
+
+ final Config config = configBuilder.build();
+ options.setConfigOverride(config);
+
+ if (listener != null) {
+ listener.onInit(config);
+ }
+
final SamzaExecutionContext executionContext = new
SamzaExecutionContext(options);
final StreamApplication app =
appDescriptor -> {
@@ -66,11 +88,8 @@ public class SamzaRunner extends
PipelineRunner<SamzaPipelineResult> {
SamzaPortablePipelineTranslator.translate(
pipeline, new PortableTranslationContext(appDescriptor,
options));
};
- final ApplicationRunner runner =
- ApplicationRunners.getApplicationRunner(app, configBuilder.build());
- final SamzaPipelineResult result = new SamzaPipelineResult(app, runner,
executionContext);
- runner.run();
- return result;
+
+ return runSamzaApp(app, config, executionContext);
}
@Override
@@ -91,19 +110,63 @@ public class SamzaRunner extends
PipelineRunner<SamzaPipelineResult> {
final ConfigBuilder configBuilder = new ConfigBuilder(options);
SamzaPipelineTranslator.createConfig(pipeline, options, idMap,
configBuilder);
+
+ final Config config = configBuilder.build();
+ options.setConfigOverride(config);
+
+ if (listener != null) {
+ listener.onInit(config);
+ }
+
final SamzaExecutionContext executionContext = new
SamzaExecutionContext(options);
+ final Map<String, MetricsReporterFactory> reporterFactories =
getMetricsReporters();
final StreamApplication app =
appDescriptor -> {
appDescriptor.withApplicationContainerContextFactory(executionContext.new
Factory());
+ appDescriptor.withMetricsReporterFactories(reporterFactories);
+
SamzaPipelineTranslator.translate(
pipeline, new TranslationContext(appDescriptor, idMap, options));
};
- final ApplicationRunner runner =
- ApplicationRunners.getApplicationRunner(app, configBuilder.build());
- final SamzaPipelineResult result = new SamzaPipelineResult(app, runner,
executionContext);
- runner.run();
+ return runSamzaApp(app, config, executionContext);
+ }
+
+ private Map<String, MetricsReporterFactory> getMetricsReporters() {
+ if (options.getMetricsReporters() != null) {
+ final Map<String, MetricsReporterFactory> reporters = new HashMap<>();
+ for (int i = 0; i < options.getMetricsReporters().size(); i++) {
+ final String name = "beam-metrics-reporter-" + i;
+ final MetricsReporter reporter = options.getMetricsReporters().get(i);
+
+ reporters.put(name, (MetricsReporterFactory) (nm, processorId, config)
-> reporter);
+ }
+ return reporters;
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ private SamzaPipelineResult runSamzaApp(
+ StreamApplication app, Config config, SamzaExecutionContext
executionContext) {
+
+ final ApplicationRunner runner =
ApplicationRunners.getApplicationRunner(app, config);
+ final SamzaPipelineResult result =
+ new SamzaPipelineResult(app, runner, executionContext, listener);
+
+ ExternalContext externalContext = null;
+ if (listener != null) {
+ externalContext = listener.onStart();
+ }
+
+ runner.run(externalContext);
+
+ if (listener != null
+ && options.getSamzaExecutionEnvironment() ==
SamzaExecutionEnvironment.YARN) {
+ listener.onSubmit();
+ }
+
return result;
}
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
index dfcea8b..1d776b8 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystem.java
@@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
*/
// TODO: instrumentation for the consumer
public class BoundedSourceSystem {
+ private static final Logger LOG =
LoggerFactory.getLogger(BoundedSourceSystem.class);
private static <T> List<BoundedSource<T>> split(
BoundedSource<T> source, SamzaPipelineOptions pipelineOptions) throws
Exception {
@@ -417,7 +418,8 @@ public class BoundedSourceSystem {
@Override
public SystemProducer getProducer(String systemName, Config config,
MetricsRegistry registry) {
- throw new UnsupportedOperationException("Cannot create a producer for an
input system");
+ LOG.info("System " + systemName + " does not have producer.");
+ return null;
}
@Override
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
index a3f958b..f5066cd 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystem.java
@@ -71,6 +71,8 @@ import org.slf4j.LoggerFactory;
* into partitions. Samza creates the job model by assigning partitions to
Samza tasks.
*/
public class UnboundedSourceSystem {
+ private static final Logger LOG =
LoggerFactory.getLogger(UnboundedSourceSystem.class);
+
// A dummy message used to force the consumer to wake up immediately and
check the
// lastException field, which will be populated.
private static final IncomingMessageEnvelope CHECK_LAST_EXCEPTION_ENVELOPE =
@@ -460,7 +462,8 @@ public class UnboundedSourceSystem {
@Override
public SystemProducer getProducer(String systemName, Config config,
MetricsRegistry registry) {
- throw new UnsupportedOperationException("Cannot create a producer for an
input system");
+ LOG.info("System " + systemName + " does not have producer.");
+ return null;
}
@Override
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index eb1d997..d986b15 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -74,8 +75,10 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT,
OutT, Void> {
// NOTE: we use HashMap here to guarantee Serializability
private final HashMap<String, PCollectionView<?>> idToViewMap;
private final String stepName;
+ private final String stepId;
private final Coder<InT> inputCoder;
private final HashMap<TupleTag<?>, Coder<?>> outputCoders;
+ private final PCollection.IsBounded isBounded;
// portable api related
private final boolean isPortable;
@@ -114,6 +117,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT,
OutT, Void> {
Map<String, PCollectionView<?>> idToViewMap,
OutputManagerFactory<OutT> outputManagerFactory,
String stepName,
+ String stepId,
+ PCollection.IsBounded isBounded,
boolean isPortable,
RunnerApi.ExecutableStagePayload stagePayload,
Map<String, TupleTag<?>> idToTupleTagMap,
@@ -128,7 +133,9 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT,
OutT, Void> {
this.idToViewMap = new HashMap<>(idToViewMap);
this.outputManagerFactory = outputManagerFactory;
this.stepName = stepName;
+ this.stepId = stepId;
this.keyCoder = keyCoder;
+ this.isBounded = isBounded;
this.isPortable = isPortable;
this.stagePayload = stagePayload;
this.idToTupleTagMap = new HashMap<>(idToTupleTagMap);
@@ -152,9 +159,10 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT,
OutT, Void> {
.get()
.as(SamzaPipelineOptions.class);
+ final String stateId = "pardo-" + stepId;
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
- null, context.getTaskContext(), pipelineOptions, signature,
mainOutputTag);
+ stateId, null, context.getTaskContext(), pipelineOptions,
signature);
this.timerInternalsFactory =
SamzaTimerInternalsFactory.createTimerInternalFactory(
@@ -163,6 +171,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT,
OutT, Void> {
getTimerStateId(signature),
nonKeyedStateInternalsFactory,
windowingStrategy,
+ isBounded,
pipelineOptions);
this.sideInputHandler =
@@ -188,6 +197,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT,
OutT, Void> {
doFn,
windowingStrategy,
stepName,
+ stateId,
context,
mainOutputTag,
sideInputHandler,
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
index f593ecc..4748ebb 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.Config;
@@ -65,6 +66,8 @@ public class GroupByKeyOp<K, InputT, OutputT>
private final Coder<K> keyCoder;
private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn;
private final String stepName;
+ private final String stepId;
+ private final IsBounded isBounded;
private transient StateInternalsFactory<K> stateInternalsFactory;
private transient SamzaTimerInternalsFactory<K> timerInternalsFactory;
@@ -77,11 +80,15 @@ public class GroupByKeyOp<K, InputT, OutputT>
SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn,
WindowingStrategy<?, BoundedWindow> windowingStrategy,
OutputManagerFactory<KV<K, OutputT>> outputManagerFactory,
- String stepName) {
+ String stepName,
+ String stepId,
+ IsBounded isBounded) {
this.mainOutputTag = mainOutputTag;
this.windowingStrategy = windowingStrategy;
this.outputManagerFactory = outputManagerFactory;
this.stepName = stepName;
+ this.stepId = stepId;
+ this.isBounded = isBounded;
if (!(inputCoder instanceof KeyedWorkItemCoder)) {
throw new IllegalArgumentException(
@@ -108,13 +115,13 @@ public class GroupByKeyOp<K, InputT, OutputT>
final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
- null, context.getTaskContext(), pipelineOptions, null,
mainOutputTag);
+ stepId, null, context.getTaskContext(), pipelineOptions, null);
final DoFnRunners.OutputManager outputManager =
outputManagerFactory.create(emitter);
this.stateInternalsFactory =
new SamzaStoreStateInternals.Factory<>(
- mainOutputTag.getId(),
+ stepId,
Collections.singletonMap(
SamzaStoreStateInternals.BEAM_STORE,
SamzaStoreStateInternals.getBeamStore(context.getTaskContext())),
@@ -128,6 +135,7 @@ public class GroupByKeyOp<K, InputT, OutputT>
TIMER_STATE_ID,
nonKeyedStateInternalsFactory,
windowingStrategy,
+ isBounded,
pipelineOptions);
final DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 692b664..0aa95a8 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -61,6 +61,7 @@ public class SamzaDoFnRunners {
DoFn<InT, FnOutT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
String stepName,
+ String stateId,
Context context,
TupleTag<FnOutT> mainOutputTag,
SideInputHandler sideInputHandler,
@@ -77,7 +78,7 @@ public class SamzaDoFnRunners {
final DoFnSignature signature =
DoFnSignatures.getSignature(doFn.getClass());
final SamzaStoreStateInternals.Factory<?> stateInternalsFactory =
SamzaStoreStateInternals.createStateInternalFactory(
- keyCoder, context.getTaskContext(), pipelineOptions, signature,
mainOutputTag);
+ stateId, keyCoder, context.getTaskContext(), pipelineOptions,
signature);
final SamzaExecutionContext executionContext =
(SamzaExecutionContext) context.getApplicationContainerContext();
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 620aae2..5b266bf 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateInternals;
@@ -60,7 +61,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.values.TupleTag;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.Ints;
@@ -82,16 +82,19 @@ public class SamzaStoreStateInternals<K> implements
StateInternals {
private final K key;
private final byte[] keyBytes;
private final int batchGetSize;
+ private final String stageId;
private SamzaStoreStateInternals(
Map<String, KeyValueStore<byte[], byte[]>> stores,
@Nullable K key,
@Nullable byte[] keyBytes,
+ String stageId,
int batchGetSize) {
this.stores = stores;
this.key = key;
this.keyBytes = keyBytes;
this.batchGetSize = batchGetSize;
+ this.stageId = stageId;
}
@SuppressWarnings("unchecked")
@@ -100,11 +103,11 @@ public class SamzaStoreStateInternals<K> implements
StateInternals {
}
static Factory createStateInternalFactory(
+ String id,
Coder<?> keyCoder,
TaskContext context,
SamzaPipelineOptions pipelineOptions,
- DoFnSignature signature,
- TupleTag<?> mainOutputTag) {
+ DoFnSignature signature) {
final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
final Map<String, KeyValueStore<byte[], byte[]>> stores = new HashMap<>();
stores.put(BEAM_STORE, getBeamStore(context));
@@ -121,12 +124,7 @@ public class SamzaStoreStateInternals<K> implements
StateInternals {
} else {
stateKeyCoder = VoidCoder.of();
}
- return new Factory<>(
- // TODO: ??? what to do with empty output?
- mainOutputTag == null ? "null" : mainOutputTag.getId(),
- stores,
- stateKeyCoder,
- batchGetSize);
+ return new Factory<>(Objects.toString(id), stores, stateKeyCoder,
batchGetSize);
}
@Override
@@ -231,7 +229,6 @@ public class SamzaStoreStateInternals<K> implements
StateInternals {
final DataOutputStream dos = new DataOutputStream(baos);
try {
- dos.writeUTF(stageId);
if (key != null) {
keyCoder.encode(key, baos);
}
@@ -244,7 +241,7 @@ public class SamzaStoreStateInternals<K> implements
StateInternals {
throw new RuntimeException("Cannot encode key for state store", e);
}
- return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(),
batchGetSize);
+ return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(),
stageId, batchGetSize);
}
}
@@ -273,7 +270,8 @@ public class SamzaStoreStateInternals<K> implements
StateInternals {
dos.writeUTF(namespace.stringKey());
if (userStore == null) {
- // for system state, we need to differentiate based on the address
+ // for system state, we need to differentiate based on the following:
+ dos.writeUTF(stageId);
dos.writeUTF(address.getId());
}
} catch (IOException e) {
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index d014a1e..146a9a8 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -36,6 +36,8 @@ import org.apache.beam.runners.samza.state.SamzaSetState;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.operators.Scheduler;
import org.joda.time.Instant;
@@ -55,6 +57,7 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
private final Scheduler<KeyedTimerData<K>> timerRegistry;
private final int timerBufferSize;
private final SamzaTimerState state;
+ private final IsBounded isBounded;
private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
private Instant outputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -65,12 +68,14 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
int timerBufferSize,
String timerStateId,
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
- Coder<BoundedWindow> windowCoder) {
+ Coder<BoundedWindow> windowCoder,
+ IsBounded isBounded) {
this.keyCoder = keyCoder;
this.timerRegistry = timerRegistry;
this.timerBufferSize = timerBufferSize;
this.eventTimeTimers = new TreeSet<>();
this.state = new SamzaTimerState(timerStateId,
nonKeyedStateInternalsFactory, windowCoder);
+ this.isBounded = isBounded;
}
static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(
@@ -79,6 +84,7 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
String timerStateId,
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
WindowingStrategy<?, BoundedWindow> windowingStrategy,
+ IsBounded isBounded,
SamzaPipelineOptions pipelineOptions) {
final Coder<BoundedWindow> windowCoder =
windowingStrategy.getWindowFn().windowCoder();
@@ -89,7 +95,8 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
pipelineOptions.getTimerBufferSize(),
timerStateId,
nonKeyedStateInternalsFactory,
- windowCoder);
+ windowCoder,
+ isBounded);
}
@Override
@@ -183,6 +190,13 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
@Override
public void setTimer(TimerData timerData) {
+ if (isBounded == IsBounded.UNBOUNDED
+ && timerData.getTimestamp().getMillis()
+ >= GlobalWindow.INSTANCE.maxTimestamp().getMillis()) {
+ // No need to register a timer of max timestamp if the input is
unbounded
+ return;
+ }
+
final KeyedTimerData<K> keyedTimerData = new KeyedTimerData<>(keyBytes,
key, timerData);
// persist it first
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index 5d90009..42a0d97 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -28,7 +28,7 @@ import
org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.SamzaExecutionEnvironment;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
-import org.apache.beam.runners.samza.SamzaRunnerOverrideConfigs;
+import org.apache.beam.runners.samza.container.BeamContainerRunner;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
@@ -48,9 +48,13 @@ import org.apache.samza.runtime.RemoteApplicationRunner;
import org.apache.samza.serializers.ByteSerdeFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Builder class to generate configs for BEAM samza runner during runtime. */
public class ConfigBuilder {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfigBuilder.class);
+
private static final String APP_RUNNER_CLASS = "app.runner.class";
private static final String YARN_PACKAGE_PATH = "yarn.package.path";
private static final String JOB_FACTORY_CLASS = "job.factory.class";
@@ -72,7 +76,8 @@ public class ConfigBuilder {
public Config build() {
try {
- config.putAll(systemStoreConfig(options));
+ // apply framework configs
+ config.putAll(createSystemConfig(options));
// apply user configs
config.putAll(createUserConfig(options));
@@ -84,8 +89,7 @@ public class ConfigBuilder {
"beamPipelineOptions",
Base64Serializer.serializeUnchecked(new
SerializablePipelineOptions(options)));
- // TODO: remove after we sort out Samza task wrapper
- config.put("samza.li.task.wrapper.enabled", "false");
+ validateConfigs(options, config);
return new MapConfig(config);
} catch (Exception e) {
@@ -93,14 +97,6 @@ public class ConfigBuilder {
}
}
- private static boolean isEmptyUserConfig(Map<String, String> config) {
- if (config == null) {
- return true;
- }
- return config.keySet().stream()
- .allMatch(key ->
key.startsWith(SamzaRunnerOverrideConfigs.BEAM_RUNNER_CONFIG_PREFIX));
- }
-
private static Map<String, String> createUserConfig(SamzaPipelineOptions
options)
throws Exception {
final Map<String, String> config = new HashMap<>();
@@ -110,11 +106,15 @@ public class ConfigBuilder {
// If user provides a config file, use it as base configs.
if (StringUtils.isNoneEmpty(configFilePath)) {
+ LOG.info("configFilePath: " + configFilePath);
+
final File configFile = new File(configFilePath);
final URI configUri = configFile.toURI();
final ConfigFactory configFactory =
options.getConfigFactory().getDeclaredConstructor().newInstance();
+ LOG.info("configFactory: " + configFactory.getClass().getName());
+
// Config file must exist for default properties config
// TODO: add check to all non-empty files once we don't need to
// pass the command-line args through the containers
@@ -129,19 +129,6 @@ public class ConfigBuilder {
config.putAll(options.getConfigOverride());
}
- switch (options.getSamzaExecutionEnvironment()) {
- case YARN:
- config.putAll(yarnRunConfig());
- validateYarnRun(config);
- break;
- case STANDALONE:
- config.putAll(standAloneRunConfig());
- validateZKStandAloneRun(config);
- break;
- default: // LOCAL
- config.putAll(localRunConfig());
- }
-
return config;
}
@@ -183,9 +170,11 @@ public class ConfigBuilder {
"Config %s not found for %s Deployment",
YARN_PACKAGE_PATH,
SamzaExecutionEnvironment.YARN);
+ final String appRunner = config.get(APP_RUNNER_CLASS);
checkArgument(
- !config.containsKey(APP_RUNNER_CLASS)
- ||
config.get(APP_RUNNER_CLASS).equals(RemoteApplicationRunner.class.getName()),
+ appRunner == null
+ || RemoteApplicationRunner.class.getName().equals(appRunner)
+ || BeamContainerRunner.class.getName().equals(appRunner),
"Config %s must be set to %s for %s Deployment",
APP_RUNNER_CLASS,
RemoteApplicationRunner.class.getName(),
@@ -195,12 +184,6 @@ public class ConfigBuilder {
"Config %s not found for %s Deployment",
JOB_FACTORY_CLASS,
SamzaExecutionEnvironment.YARN);
- checkArgument(
- config.get(JOB_FACTORY_CLASS).equals(YarnJobFactory.class.getName()),
- "Config %s must be set to %s for %s Deployment",
- JOB_FACTORY_CLASS,
- YarnJobFactory.class.getName(),
- SamzaExecutionEnvironment.STANDALONE);
}
@VisibleForTesting
@@ -240,7 +223,7 @@ public class ConfigBuilder {
.build();
}
- private static Map<String, String> systemStoreConfig(SamzaPipelineOptions
options) {
+ private static Map<String, String> createSystemConfig(SamzaPipelineOptions
options) {
ImmutableMap.Builder<String, String> configBuilder =
ImmutableMap.<String, String>builder()
.put(
@@ -251,13 +234,46 @@ public class ConfigBuilder {
.put("serializers.registry.byteSerde.class",
ByteSerdeFactory.class.getName());
if (options.getStateDurable()) {
+ LOG.info("stateDurable is enabled");
configBuilder.put("stores.beamStore.changelog",
getChangelogTopic(options, "beamStore"));
configBuilder.put("job.host-affinity.enabled", "true");
}
+ LOG.info("Execution environment is " +
options.getSamzaExecutionEnvironment());
+ switch (options.getSamzaExecutionEnvironment()) {
+ case YARN:
+ configBuilder.putAll(yarnRunConfig());
+ break;
+ case STANDALONE:
+ configBuilder.putAll(standAloneRunConfig());
+ break;
+ default: // LOCAL
+ configBuilder.putAll(localRunConfig());
+ break;
+ }
+
+ // TODO: remove after we sort out Samza task wrapper
+ configBuilder.put("samza.li.task.wrapper.enabled", "false");
+
return configBuilder.build();
}
+ private static void validateConfigs(SamzaPipelineOptions options,
Map<String, String> config) {
+
+ // validate execution environment
+ switch (options.getSamzaExecutionEnvironment()) {
+ case YARN:
+ validateYarnRun(config);
+ break;
+ case STANDALONE:
+ validateZKStandAloneRun(config);
+ break;
+ default:
+ // do nothing
+ break;
+ }
+ }
+
static String getChangelogTopic(SamzaPipelineOptions options, String
storeName) {
return String.format(
"%s-%s-%s-changelog", options.getJobName(), options.getJobInstance(),
storeName);
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 13c6f11..0af0df9 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.samza.translation;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.SystemReduceFn;
@@ -83,7 +84,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn =
getSystemReduceFn(transform, input.getPipeline(), kvInputCoder);
- MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
+ final MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
doTranslateGBK(
inputStream,
needRepartition(node, ctx),
@@ -93,7 +94,8 @@ class GroupByKeyTranslator<K, InputT, OutputT>
elementCoder,
ctx.getCurrentTopologicalId(),
node.getFullName(),
- outputTag);
+ outputTag,
+ input.isBounded());
ctx.registerMessageStream(output, outputStream);
}
@@ -114,12 +116,12 @@ class GroupByKeyTranslator<K, InputT, OutputT>
ctx.getOneInputMessageStream(transform);
final boolean needRepartition =
ctx.getSamzaPipelineOptions().getMaxSourceParallelism() > 1;
final WindowingStrategy<?, BoundedWindow> windowingStrategy =
- SamzaPipelineTranslatorUtils.getPortableWindowStrategy(transform,
pipeline);
- Coder<BoundedWindow> windowCoder =
windowingStrategy.getWindowFn().windowCoder();
+ ctx.getPortableWindowStrategy(transform, pipeline);
+ final Coder<BoundedWindow> windowCoder =
windowingStrategy.getWindowFn().windowCoder();
- String inputId = ctx.getInputId(transform);
- WindowedValue.WindowedValueCoder<KV<K, InputT>> windowedInputCoder =
- SamzaPipelineTranslatorUtils.instantiateCoder(inputId,
pipeline.getComponents());
+ final String inputId = ctx.getInputId(transform);
+ final WindowedValue.WindowedValueCoder<KV<K, InputT>> windowedInputCoder =
+ ctx.instantiateCoder(inputId, pipeline.getComponents());
final KvCoder<K, InputT> kvInputCoder = (KvCoder<K, InputT>)
windowedInputCoder.getValueCoder();
final Coder<WindowedValue<KV<K, InputT>>> elementCoder =
WindowedValue.FullWindowedValueCoder.of(kvInputCoder, windowCoder);
@@ -134,7 +136,10 @@ class GroupByKeyTranslator<K, InputT, OutputT>
(SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
SystemReduceFn.buffering(kvInputCoder.getValueCoder());
- MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
+ final RunnerApi.PCollection input =
pipeline.getComponents().getPcollectionsOrThrow(inputId);
+ final PCollection.IsBounded isBounded =
SamzaPipelineTranslatorUtils.isBounded(input);
+
+ final MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
doTranslateGBK(
inputStream,
needRepartition,
@@ -144,7 +149,8 @@ class GroupByKeyTranslator<K, InputT, OutputT>
elementCoder,
topologyId,
nodeFullname,
- outputTag);
+ outputTag,
+ isBounded);
ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
}
@@ -157,7 +163,8 @@ class GroupByKeyTranslator<K, InputT, OutputT>
Coder<WindowedValue<KV<K, InputT>>> elementCoder,
int topologyId,
String nodeFullname,
- TupleTag<KV<K, OutputT>> outputTag) {
+ TupleTag<KV<K, OutputT>> outputTag,
+ PCollection.IsBounded isBounded) {
final MessageStream<OpMessage<KV<K, InputT>>> filteredInputStream =
inputStream.filter(msg -> msg.getType() == OpMessage.Type.ELEMENT);
@@ -173,6 +180,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
KVSerde.of(
SamzaCoders.toSerde(kvInputCoder.getKeyCoder()),
SamzaCoders.toSerde(elementCoder)),
+ // TODO: infer a fixed id from the name
"gbk-" + topologyId)
.map(kv -> OpMessage.ofElement(kv.getValue()));
}
@@ -194,7 +202,10 @@ class GroupByKeyTranslator<K, InputT, OutputT>
reduceFn,
windowingStrategy,
new DoFnOp.SingleOutputManagerFactory<>(),
- nodeFullname)));
+ nodeFullname,
+ // TODO: infer a fixed id from the name
+ outputTag.getId(),
+ isBounded)));
return outputStream;
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
index baa51ab..2ccbd6c 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
@@ -21,7 +21,9 @@ import
org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
@@ -40,8 +42,11 @@ public class ImpulseTranslator implements
TransformTranslator {
final String outputId = ctx.getOutputId(transform);
final GenericSystemDescriptor systemDescriptor =
new GenericSystemDescriptor(outputId,
SamzaImpulseSystemFactory.class.getName());
+
+ // The KvCoder is needed here for Samza not to crop the key.
+ final Serde<KV<?, OpMessage<byte[]>>> kvSerde = KVSerde.of(new
NoOpSerde(), new NoOpSerde<>());
final GenericInputDescriptor<KV<?, OpMessage<byte[]>>> inputDescriptor =
- systemDescriptor.getInputDescriptor(outputId, new NoOpSerde<>());
+ systemDescriptor.getInputDescriptor(outputId, kvSerde);
ctx.registerInputMessageStream(outputId, inputDescriptor);
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index e4651c4..ad699af 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -148,6 +148,9 @@ class ParDoBoundMultiTranslator<InT, OutT>
idToPValueMap,
new DoFnOp.MultiOutputManagerFactory(tagToIndexMap),
node.getFullName(),
+ // TODO: infer a fixed id from the name
+ String.valueOf(ctx.getCurrentTopologicalId()),
+ input.isBounded(),
false,
null,
Collections.emptyMap(),
@@ -237,6 +240,9 @@ class ParDoBoundMultiTranslator<InT, OutT>
final DoFnSchemaInformation doFnSchemaInformation;
doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(transform.getTransform());
+ final RunnerApi.PCollection input =
pipeline.getComponents().getPcollectionsOrThrow(inputId);
+ final PCollection.IsBounded isBounded =
SamzaPipelineTranslatorUtils.isBounded(input);
+
final DoFnOp<InT, OutT, RawUnionValue> op =
new DoFnOp<>(
mainOutputTag,
@@ -250,6 +256,9 @@ class ParDoBoundMultiTranslator<InT, OutT>
Collections.emptyMap(), // idToViewMap not in use until side input
support
new DoFnOp.MultiOutputManagerFactory(tagToIndexMap),
nodeFullname,
+ // TODO: infer a fixed id from the name
+ String.valueOf(ctx.getCurrentTopologicalId()),
+ isBounded,
true,
stagePayload,
idToTupleTagMap,
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
index 1f18a75..5974cb6 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPortablePipelineTranslator.java
@@ -23,6 +23,7 @@ import java.util.ServiceLoader;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +63,8 @@ public class SamzaPortablePipelineTranslator {
}
}
- public static void createConfig(RunnerApi.Pipeline pipeline, ConfigBuilder
configBuilder) {
+ public static void createConfig(
+ RunnerApi.Pipeline pipeline, ConfigBuilder configBuilder,
SamzaPipelineOptions options) {
QueryablePipeline queryablePipeline =
QueryablePipeline.forTransforms(
pipeline.getRootTransformIdsList(), pipeline.getComponents());
@@ -72,7 +74,7 @@ public class SamzaPortablePipelineTranslator {
TRANSLATORS.get(transform.getTransform().getSpec().getUrn());
if (translator instanceof TransformConfigGenerator) {
TransformConfigGenerator configGenerator = (TransformConfigGenerator)
translator;
- configBuilder.putAll(configGenerator.createPortableConfig(transform));
+ configBuilder.putAll(configGenerator.createPortableConfig(transform,
options));
}
}
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
index 7b03bb0..b6ac2ba 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TransformConfigGenerator.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.samza.translation;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
@@ -32,7 +33,8 @@ public interface TransformConfigGenerator<T extends
PTransform<?, ?>> {
}
/** Generate config for portable api PTransform. */
- default Map<String, String> createPortableConfig(PipelineNode.PTransformNode
transform) {
+ default Map<String, String> createPortableConfig(
+ PipelineNode.PTransformNode transform, SamzaPipelineOptions options) {
return Collections.emptyMap();
}
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java
index 781ca68..50cccfe 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineTranslatorUtils.java
@@ -26,6 +26,7 @@ import
org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
@@ -74,4 +75,20 @@ public final class SamzaPipelineTranslatorUtils {
(WindowingStrategy<?, BoundedWindow>) windowingStrategy;
return ret;
}
+
+ /**
+ * Escape the non-alphabet chars in the name so we can create a physical
stream out of it.
+ *
+ * <p>This escape will replace ".", "(" and "/" as "-", and then remove all
the other
+ * non-alphabetic characters.
+ */
+ public static String escape(String name) {
+ return name.replaceAll("[\\.(/]", "-").replaceAll("[^A-Za-z0-9-_]", "");
+ }
+
+ public static PCollection.IsBounded isBounded(RunnerApi.PCollection
pCollection) {
+ return pCollection.getIsBounded() == RunnerApi.IsBounded.Enum.BOUNDED
+ ? PCollection.IsBounded.BOUNDED
+ : PCollection.IsBounded.UNBOUNDED;
+ }
}
diff --git
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 25ef315..9328676 100644
---
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.samza.config.MapConfig;
@@ -78,7 +79,7 @@ public class SamzaTimerInternalsFactoryTest {
final TupleTag<?> mainOutputTag = new TupleTag<>("output");
return SamzaStoreStateInternals.createStateInternalFactory(
- null, context, pipelineOptions, null, mainOutputTag);
+ "42", null, context, pipelineOptions, null);
}
private static SamzaTimerInternalsFactory<String>
createTimerInternalsFactory(
@@ -96,6 +97,7 @@ public class SamzaTimerInternalsFactoryTest {
timerStateId,
nonKeyedStateInternalsFactory,
(WindowingStrategy) WindowingStrategy.globalDefault(),
+ PCollection.IsBounded.BOUNDED,
pipelineOptions);
}
diff --git a/settings.gradle b/settings.gradle
index c453272..daeb827 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -81,6 +81,8 @@ include "beam-runners-spark"
project(":beam-runners-spark").dir = file("runners/spark")
include "beam-runners-samza"
project(":beam-runners-samza").dir = file("runners/samza")
+include "beam-runners-samza-job-server"
+project(":beam-runners-samza-job-server").dir =
file("runners/samza/job-server")
include "beam-sdks-go"
project(":beam-sdks-go").dir = file("sdks/go")
include "beam-sdks-go-container"