robertwb commented on a change in pull request #14607:
URL: https://github.com/apache/beam/pull/14607#discussion_r617748705



##########
File path: examples/java/src/main/java/org/apache/beam/examples/WordCount.java
##########
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.examples;
 
+import java.util.Arrays;

Review comment:
       Just a note that we'll want to omit this change.

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java
##########
@@ -194,6 +195,7 @@ public OutputT expand(InputT input) {
                     PValues.expandInput(PBegin.in(p)),
                     ImmutableMap.of(entry.getKey(), (PCollection<?>) 
entry.getValue()),
                     Impulse.create(),
+                    ResourceHints.create(),

Review comment:
       Maybe drop a TODO here about figuring out what to do with resource hints 
across multi-language pipelines?

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
##########
@@ -503,7 +509,16 @@ public void 
visitPrimitiveTransform(TransformHierarchy.Node node) {
           node.getFullName());
       LOG.debug("Translating {}", transform);
       currentTransform = node.toAppliedPTransform(getPipeline());
+      ResourceHints hints = transform.getResourceHints();
+      // AppliedPTransform instance stores resource hints of current transform 
merged with outer
+      // hints (e.g. set on outer composites).
+      // Translation reads resource hints from PTransform objects, so update 
the hints.
+      transform.setResourceHints(currentTransform.getResourceHints());
       translator.translate(transform, this);

Review comment:
       Just a thought: would it be better to let translator take an 
AppliedPTransform, or an extra argument than the hints, rather than do the 
set/unset dance? 

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
##########
@@ -887,6 +887,9 @@ private Debuggee registerDebuggee(CloudDebugger 
debuggerClient, String uniquifie
   }
 
   private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {
+    if (true) {

Review comment:
       Reminder: revert. (This entire file?)

##########
File path: examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
##########
@@ -67,8 +63,8 @@ public void testE2EWordCount() throws Exception {
             .resolve("results", StandardResolveOptions.RESOLVE_FILE)
             .toString());
     WordCount.runWordCount(options);
-    assertThat(
-        new NumberedShardedFile(options.getOutput() + "*-of-*"),
-        fileContentsHaveChecksum(DEFAULT_OUTPUT_CHECKSUM));
+    // assertThat(

Review comment:
       Same.

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
##########
@@ -287,7 +288,8 @@ public static FunctionSpec toProto(WindowFn<?, ?> windowFn, 
SdkComponents compon
     FunctionSpec windowFnSpec = toProto(windowFn, components);
     String environmentId =
         Strings.isNullOrEmpty(windowingStrategy.getEnvironmentId())
-            ? components.getOnlyEnvironmentId()
+            ? components.getEnvironmentIdFor(
+                ResourceHints.create()) // TODO: get this from elsewhere

Review comment:
       OK, this we need to resolve. I'll do it.

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
##########
@@ -61,6 +66,20 @@ public Job createJob(@Nonnull Job job) throws IOException {
             .locations()
             .jobs()
             .create(options.getProject(), options.getRegion(), job);
+
+    LOG.info("Dataflow v1 job request:\n");

Review comment:
       Do we want to keep this? Lower to LOG.debug?

##########
File path: runners/google-cloud-dataflow-java/examples/build.gradle
##########
@@ -48,7 +48,7 @@ def commonConfig = { dataflowWorkerJar, 
workerHarnessContainerImage = '', additi
    return {
        testClassesDirs = 
files(project(":examples:java").sourceSets.test.output.classesDirs)
        include "**/WordCountIT.class"
-       include "**/WindowedWordCountIT.class"
+       // include "**/WindowedWordCountIT.class"

Review comment:
       Note to revert.

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
##########
@@ -122,7 +122,8 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner 
runner) {
       // No metrics in streaming
       allAssertionsPassed = Optional.absent();
     } else {
-      jobSuccess = waitForBatchJobTermination(job, messageHandler);
+      jobSuccess = true;

Review comment:
       Note: revert.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
##########
@@ -489,8 +495,9 @@ void replaceOutputs(Map<PCollection<?>, ReplacementOutput> 
originalToReplacement
 
     /** Returns the {@link AppliedPTransform} representing this {@link Node}. 
*/
     public AppliedPTransform<?, ?, ?> toAppliedPTransform(Pipeline pipeline) {
+      // TODO: Resource hint nesting.

Review comment:
       We can remove this TODO now.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.resourcehints;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardResourceHints;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ProtocolMessageEnum;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+public class ResourceHints {
+  private static final String MIN_RAM_URN = "beam:resources:min_ram_bytes:v1";
+  private static final String ACCELERATOR_URN = 
"beam:resources:accelerator:v1";
+
+  // TODO: reference this from a common location in all packages that use this.
+  private static String getUrn(ProtocolMessageEnum value) {
+    return 
value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn);
+  }
+
+  static {
+    
checkState(MIN_RAM_URN.equals(getUrn(StandardResourceHints.Enum.MIN_RAM_BYTES)));
+    
checkState(ACCELERATOR_URN.equals(getUrn(StandardResourceHints.Enum.ACCELERATOR)));
+  }
+
+  private static ImmutableMap<String, String> hintNameToUrn =
+      ImmutableMap.<String, String>builder()
+          .put("minRam", MIN_RAM_URN)
+          .put("min_ram", MIN_RAM_URN) // Courtesy alias.
+          .put("accelerator", ACCELERATOR_URN)
+          .build();
+
+  private static ImmutableMap<String, Function<String, ResourceHint>> parsers =
+      ImmutableMap.<String, Function<String, ResourceHint>>builder()
+          .put(MIN_RAM_URN, s -> new BytesHint(BytesHint.parse(s)))
+          .put(ACCELERATOR_URN, s -> new StringHint(s))
+          .build();
+
+  private static ResourceHints empty = new ResourceHints(ImmutableMap.of());
+
+  private final ImmutableMap<String, ResourceHint> hints;
+
+  private ResourceHints(ImmutableMap<String, ResourceHint> hints) {
+    this.hints = hints;
+  }
+
+  public static ResourceHints create() {
+    return empty;
+  }
+
+  public static ResourceHints fromOptions(PipelineOptions options) {
+    ResourceHintsOptions resourceHintsOptions = 
options.as(ResourceHintsOptions.class);
+    ResourceHints result = create();
+    if (resourceHintsOptions.getResourceHints() == null) {
+      return result;
+    }
+    Splitter splitter = Splitter.on('=').limit(2);
+    for (String hint : resourceHintsOptions.getResourceHints()) {
+      List<String> parts = splitter.splitToList(hint);
+      if (parts.size() != 2) {
+        throw new IllegalArgumentException("Unparsable resource hint: " + 
hint);
+      }
+      String nameOrUrn = parts.get(0);
+      String stringValue = parts.get(1);
+      String urn;
+      if (hintNameToUrn.containsKey(nameOrUrn)) {
+        urn = hintNameToUrn.get(nameOrUrn);
+      } else if (!nameOrUrn.startsWith("beam:resources:")) {
+        // Allow unknown hints to be passed, but validate a little bit to 
prevent typos.
+        throw new IllegalArgumentException("Unknown resource hint: " + hint);
+      } else {
+        urn = nameOrUrn;
+      }
+      ResourceHint value = parsers.getOrDefault(urn, s -> new 
StringHint(s)).apply(stringValue);
+      result = result.withHint(urn, value);
+    }
+    return result;
+  }
+
+  /*package*/ static class BytesHint extends ResourceHint {
+    private static Map<String, Long> suffixes =
+        ImmutableMap.<String, Long>builder()
+            .put("B", 1L)

Review comment:
       Per other comment, should this be case sensitive?

##########
File path: 
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
##########
@@ -61,6 +66,20 @@ public Job createJob(@Nonnull Job job) throws IOException {
             .locations()
             .jobs()
             .create(options.getProject(), options.getRegion(), job);
+
+    LOG.info("Dataflow v1 job request:\n");
+    for (Step step : job.getSteps()) {
+      Object resourceHints = step.getProperties().get("resource_hints");
+      if (resourceHints == null) {
+        LOG.info(
+            "Hints for step {}: {}.",
+            step.getName(),
+            resourceHints == null ? "None" : resourceHints.toString());
+      }
+    }
+    if (true) {

Review comment:
       Note to revert.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.resourcehints;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardResourceHints;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ProtocolMessageEnum;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+public class ResourceHints {
+  private static final String MIN_RAM_URN = "beam:resources:min_ram_bytes:v1";
+  private static final String ACCELERATOR_URN = 
"beam:resources:accelerator:v1";
+
+  // TODO: reference this from a common location in all packages that use this.
+  private static String getUrn(ProtocolMessageEnum value) {
+    return 
value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn);
+  }
+
+  static {
+    
checkState(MIN_RAM_URN.equals(getUrn(StandardResourceHints.Enum.MIN_RAM_BYTES)));
+    
checkState(ACCELERATOR_URN.equals(getUrn(StandardResourceHints.Enum.ACCELERATOR)));
+  }
+
+  private static ImmutableMap<String, String> hintNameToUrn =
+      ImmutableMap.<String, String>builder()
+          .put("minRam", MIN_RAM_URN)
+          .put("min_ram", MIN_RAM_URN) // Courtesy alias.
+          .put("accelerator", ACCELERATOR_URN)
+          .build();
+
+  private static ImmutableMap<String, Function<String, ResourceHint>> parsers =
+      ImmutableMap.<String, Function<String, ResourceHint>>builder()
+          .put(MIN_RAM_URN, s -> new BytesHint(BytesHint.parse(s)))
+          .put(ACCELERATOR_URN, s -> new StringHint(s))
+          .build();
+
+  private static ResourceHints empty = new ResourceHints(ImmutableMap.of());
+
+  private final ImmutableMap<String, ResourceHint> hints;
+
+  private ResourceHints(ImmutableMap<String, ResourceHint> hints) {
+    this.hints = hints;
+  }
+
+  public static ResourceHints create() {
+    return empty;
+  }
+
+  public static ResourceHints fromOptions(PipelineOptions options) {
+    ResourceHintsOptions resourceHintsOptions = 
options.as(ResourceHintsOptions.class);
+    ResourceHints result = create();
+    if (resourceHintsOptions.getResourceHints() == null) {
+      return result;
+    }
+    Splitter splitter = Splitter.on('=').limit(2);
+    for (String hint : resourceHintsOptions.getResourceHints()) {
+      List<String> parts = splitter.splitToList(hint);
+      if (parts.size() != 2) {
+        throw new IllegalArgumentException("Unparsable resource hint: " + 
hint);
+      }
+      String nameOrUrn = parts.get(0);
+      String stringValue = parts.get(1);
+      String urn;
+      if (hintNameToUrn.containsKey(nameOrUrn)) {
+        urn = hintNameToUrn.get(nameOrUrn);
+      } else if (!nameOrUrn.startsWith("beam:resources:")) {
+        // Allow unknown hints to be passed, but validate a little bit to 
prevent typos.
+        throw new IllegalArgumentException("Unknown resource hint: " + hint);
+      } else {
+        urn = nameOrUrn;
+      }
+      ResourceHint value = parsers.getOrDefault(urn, s -> new 
StringHint(s)).apply(stringValue);
+      result = result.withHint(urn, value);
+    }
+    return result;
+  }
+
+  /*package*/ static class BytesHint extends ResourceHint {
+    private static Map<String, Long> suffixes =
+        ImmutableMap.<String, Long>builder()
+            .put("B", 1L)
+            .put("KB", 1000L)
+            .put("MB", 1000_000L)
+            .put("GB", 1000_000_000L)
+            .put("TB", 1000_000_000_000L)
+            .put("PB", 1000_000_000_000_000L)
+            .put("KiB", 1L << 10)
+            .put("MiB", 1L << 20)
+            .put("GiB", 1L << 30)
+            .put("TiB", 1L << 40)
+            .put("PiB", 1L << 50)
+            .build();
+
+    private final long value;
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      } else if (other instanceof BytesHint) {
+        return ((BytesHint) other).value == value;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return Long.hashCode(value);
+    }
+
+    public BytesHint(long value) {
+      this.value = value;
+    }
+
+    public static long parse(String s) {
+      Matcher m = Pattern.compile("([\\d.]+)[\\s]?(([KMGTP]i?)?B)").matcher(s);
+      // Matcher m = Pattern.compile("^\\d\\.?\\d*").matcher(s);

Review comment:
       Remove?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to