scwhittle commented on code in PR #33580:
URL: https://github.com/apache/beam/pull/33580#discussion_r1914521841


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ShardedKey.java:
##########
@@ -35,4 +35,9 @@ public static ShardedKey create(ByteString key, long 
shardingKey) {
   public final String toString() {
     return String.format("%016x", shardingKey());
   }
+
+  @Override
+  public final int hashCode() {
+    return Long.hashCode(shardingKey());

Review Comment:
   ditto add comment



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -191,7 +191,8 @@ public boolean throwExceptionsForLargeOutput() {
   }
 
   public boolean workIsFailed() {
-    return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
+    if (work != null) return work.isFailed();

Review Comment:
   keep single line?
   return work == null ? false : work.isFailed();



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillComputationKey.java:
##########
@@ -45,4 +46,9 @@ public final String toString() {
     return String.format(
         "%s: %s-%d", computationId(), TextFormat.escapeBytes(key()), 
shardingKey());
   }
+
+  @Override
+  public final int hashCode() {
+    return Objects.hash(shardingKey(), computationId());

Review Comment:
   add comment?
   // Sharding key collisions are unexpected



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java:
##########
@@ -406,36 +404,27 @@ private static boolean useNewTimerTagEncoding(TimerData 
timerData) {
    */
   public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
     String tagString;
-    ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream();
-    try {
-      if (useNewTimerTagEncoding(timerData)) {
-        tagString =
-            new StringBuilder()
-                .append(prefix.byteString().toStringUtf8()) // this never ends 
with a slash
-                .append(
-                    timerData.getNamespace().stringKey()) // this must begin 
and end with a slash
-                .append('+')
-                .append(timerData.getTimerId()) // this is arbitrary; 
currently unescaped
-                .append('+')
-                .append(timerData.getTimerFamilyId())
-                .toString();
-        out.write(tagString.getBytes(StandardCharsets.UTF_8));
-      } else {
-        // Timers without timerFamily would have timerFamily would be an empty 
string
-        tagString =
-            new StringBuilder()
-                .append(prefix.byteString().toStringUtf8()) // this never ends 
with a slash
-                .append(
-                    timerData.getNamespace().stringKey()) // this must begin 
and end with a slash
-                .append('+')
-                .append(timerData.getTimerId()) // this is arbitrary; 
currently unescaped
-                .toString();
-        out.write(tagString.getBytes(StandardCharsets.UTF_8));
-      }
-      return ByteString.readFrom(new 
ExposedByteArrayInputStream(out.toByteArray()));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    if (useNewTimerTagEncoding(timerData)) {
+      tagString =
+          new StringBuilder()
+              .append(prefix.byteString().toStringUtf8()) // this never ends 
with a slash
+              .append(timerData.getNamespace().stringKey()) // this must begin 
and end with a slash
+              .append('+')
+              .append(timerData.getTimerId()) // this is arbitrary; currently 
unescaped
+              .append('+')
+              .append(timerData.getTimerFamilyId())
+              .toString();
+    } else {
+      // Timers without timerFamily would have timerFamily would be an empty 
string
+      tagString =
+          new StringBuilder()
+              .append(prefix.byteString().toStringUtf8()) // this never ends 
with a slash
+              .append(timerData.getNamespace().stringKey()) // this must begin 
and end with a slash
+              .append('+')
+              .append(timerData.getTimerId()) // this is arbitrary; currently 
unescaped
+              .toString();
     }
+    return ByteString.copyFromUtf8(tagString);

Review Comment:
   There is also beam's ByteStringOutputStream but given that these are short 
I'm guessing this could be faster.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -332,7 +330,9 @@ private synchronized ImmutableMap<ShardedKey, WorkId> 
getStuckCommitsAt(
    * {@link ActiveWorkState}, and not committed back to Windmill.
    */
   GetWorkBudget currentActiveWorkBudget() {
-    return activeGetWorkBudget.get();
+    synchronized (this) {
+      return activeGetWorkBudget;

Review Comment:
   just make method synchronized



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to