mynameborat commented on code in PR #27131:
URL: https://github.com/apache/beam/pull/27131#discussion_r1230423306
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java:
##########
@@ -155,34 +169,52 @@ public void processTimer(KeyedTimerData<Void>
keyedTimerData, OpEmitter<OutT> em
public void signalFailure(Throwable t) {
LOG.error("Encountered error during processing the message. Discarding the
output due to: ", t);
- isBundleStarted = false;
- currentBundleElementCount = 0;
- bundleStartTime = Long.MAX_VALUE;
- pendingBundleCount--;
+ isBundleStarted.set(false);
+ currentBundleElementCount.set(0);
+ bundleStartTime.set(Long.MAX_VALUE);
+ pendingBundleCount.decrementAndGet();
+
+ if (!isBundleStarted.get() && currentBundleElementCount.get() != 0) {
+ LOG.warn(
+ "signalFailure: isBundleStarted = false, but
currentBundleElementCount = {}",
+ currentBundleElementCount);
+ }
}
@Override
public void tryFinishBundle(OpEmitter<OutT> emitter) {
- if (shouldFinishBundle()) {
- LOG.debug("Finishing the current bundle.");
- isBundleStarted = false;
- currentBundleElementCount = 0;
- bundleStartTime = Long.MAX_VALUE;
+ LOG.debug("tryFinishBundle: elementCount={}", currentBundleElementCount);
+ if (shouldFinishBundle() && isBundleStarted.compareAndSet(true, false)) {
+ LOG.debug("Finishing the current bundle. Bundle={}", this);
+ currentBundleElementCount.set(0);
+ bundleStartTime.set(Long.MAX_VALUE);
Instant watermarkHold = bundleWatermarkHold;
bundleWatermarkHold = null;
- pendingBundleCount--;
+ pendingBundleCount.decrementAndGet();
+
+ if (currentBundleElementCount.get() != 0) {
+ LOG.warn("elementCount increased while tryFinishBundle!");
+ }
+ if (!isBundleStarted.get()) {
Review Comment:
The condition should be inverted to see if some other thread started the
bundle while it was finished.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java:
##########
@@ -110,15 +112,22 @@ private void scheduleNextBundleCheck() {
@Override
public void tryStartBundle() {
- currentBundleElementCount++;
+ currentBundleElementCount.incrementAndGet();
+ LOG.debug(
+ "tryStartBundle: elementCount={}, Bundle={}",
currentBundleElementCount, this.toString());
- if (!isBundleStarted) {
+ if (isBundleStarted.compareAndSet(false, true)) {
LOG.debug("Starting a new bundle.");
- isBundleStarted = true;
- bundleStartTime = System.currentTimeMillis();
- pendingBundleCount++;
+ bundleStartTime.set(System.currentTimeMillis());
+ pendingBundleCount.getAndIncrement();
bundleProgressListener.onBundleStarted();
}
+
+ if (!isBundleStarted.get() && currentBundleElementCount.get() != 0) {
+ LOG.warn(
+ "tryStartBundle: isBundleStarted = false, but
currentBundleElementCount = {}",
+ currentBundleElementCount);
Review Comment:
extract to a method since pretty much all three places use the same
assertion.
Also , prefer to have this at the top of the method to fail fast potentially
(if we decide to in future) and also, easy to reason about the state change
happened else where in other places that mutates this variable as opposed to
one more additional branching to account if the code above changed something.
let me know if you have any other reasons to put this at the end of the
method.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PortableUtils.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/** A utility class to encapsulate. */
+public class PortableUtils {
+ public static final String BEAM_PORTABLE_MODE = "beam.portable.mode";
+ public static final String PORTABLE_JOB_SERVER_ENDPOINT =
"job.server.endpoint";
+
+ private PortableUtils() {}
+
+ /**
+ * A helper method to distinguish if a pipeline is run using portable mode
or classic mode.
+ *
+ * @param options pipeline options
+ * @return true if the pipeline is run in portable mode
+ */
+ public static boolean isPortable(SamzaPipelineOptions options) {
+ Map<String, String> override = options.getConfigOverride();
+ if (override == null) {
+ return false;
+ }
+
+ return Boolean.parseBoolean(override.getOrDefault("beam.portable.mode",
"false"));
+ }
+
+ public static List<String> getServerDriverArgs() {
+ // assigning ports to 0 so that JobServerDriver can dynamically assign
ports
+ return Arrays.asList("--job-port=0", "--artifact-port=0",
"--expansion-port=0");
+ }
+
+ public static PipelineOptions getPipelineOptions(JobInfo jobInfo) {
+ return PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+ }
+
+ /** Updates the Samza portable framework main class runner arguments with
jobServerUrl. */
+ public static List<String> updateSamzaPortableMainArgs(List<String> args,
String jobServerUrl) {
+
+ // set the job server endpoint
+ args.add("--config");
+ args.add(String.format("%s=%s",
PortableUtils.PORTABLE_JOB_SERVER_ENDPOINT, jobServerUrl));
+
+ return args;
+ }
+
+ /** Updates the Spark portable framework main class runner arguments with
jobServerUrl. */
+ public static List<String> updateSparkPortableMainArgs(List<String> args,
String jobServerUrl) {
+
+ // set the job server endpoint
+ args.add(jobServerUrl);
+
+ return args;
+ }
+
+ public static List<String> updateSamzaPortableArgs(String mainClassName,
String[] args) {
+ List<String> updatedArgs = new ArrayList<>(Arrays.asList(args));
+ // Set main class to run
+ updatedArgs.add(0, mainClassName);
+
+ // set the flag for portable
+ updatedArgs.add("--config");
+ updatedArgs.add(String.format("%s=%s", PortableUtils.BEAM_PORTABLE_MODE,
"true"));
+
+ return updatedArgs;
Review Comment:
Where are these methods used in the context of this PR? Do we need all
dependencies in oSS to port this util class to beam OSS?
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java:
##########
@@ -110,15 +112,22 @@ private void scheduleNextBundleCheck() {
@Override
public void tryStartBundle() {
- currentBundleElementCount++;
+ currentBundleElementCount.incrementAndGet();
Review Comment:
There are two ways to interpret the semantics of `currentBundleElementCount`
1. Element is counted towards a bundle regardless of the `onBundleStarted`
being success or failed
2. Element is counted towards a bundle only if a bundle has been started
successfully a.k.a `onBundleStarted` completed successfully in this case along
with other initializations.
I want to make sure the choice of putting this up top is a conscious one and
you are choosing option 1. For the element count it doesn't matter much but I
want to see what is your semantics for any potential future state introduced
and if it will be uninitialized vs initialized regardless of success criteria.
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/PortableUtils.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/** A utility class to encapsulate. */
+public class PortableUtils {
+ public static final String BEAM_PORTABLE_MODE = "beam.portable.mode";
+ public static final String PORTABLE_JOB_SERVER_ENDPOINT =
"job.server.endpoint";
+
+ private PortableUtils() {}
+
+ /**
+ * A helper method to distinguish if a pipeline is run using portable mode
or classic mode.
+ *
+ * @param options pipeline options
+ * @return true if the pipeline is run in portable mode
+ */
+ public static boolean isPortable(SamzaPipelineOptions options) {
+ Map<String, String> override = options.getConfigOverride();
+ if (override == null) {
+ return false;
+ }
+
+ return Boolean.parseBoolean(override.getOrDefault("beam.portable.mode",
"false"));
Review Comment:
s/"beam.portable.mode"/BEAM_PORTABLE_MODE
##########
runners/samza/src/test/java/org/apache/beam/runners/samza/util/PortableUtilsTest.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PortableUtilsTest {
+
+ @Test
+ public void testNonPortableMode() {
+ SamzaPipelineOptions mockOptions = mock(SamzaPipelineOptions.class);
+ Map<String, String> config = new HashMap<>();
+ config.put(PortableUtils.BEAM_PORTABLE_MODE, "false");
+ doReturn(config).when(mockOptions).getConfigOverride();
+ Assert.assertFalse("Expected false for portable mode ",
PortableUtils.isPortable(mockOptions));
Review Comment:
maybe add another assertion prior to returning mock to validate the null
criteria which should also return `false`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]