scwhittle commented on code in PR #33580:
URL: https://github.com/apache/beam/pull/33580#discussion_r1914521841
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ShardedKey.java:
##########
@@ -35,4 +35,9 @@ public static ShardedKey create(ByteString key, long
shardingKey) {
public final String toString() {
return String.format("%016x", shardingKey());
}
+
+ @Override
+ public final int hashCode() {
+ return Long.hashCode(shardingKey());
Review Comment:
ditto add comment
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -191,7 +191,8 @@ public boolean throwExceptionsForLargeOutput() {
}
public boolean workIsFailed() {
- return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
+ if (work != null) return work.isFailed();
Review Comment:
keep single line?
return work == null ? false : work.isFailed();
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillComputationKey.java:
##########
@@ -45,4 +46,9 @@ public final String toString() {
return String.format(
"%s: %s-%d", computationId(), TextFormat.escapeBytes(key()),
shardingKey());
}
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(shardingKey(), computationId());
Review Comment:
add comment?
// Sharding key collisions are unexpected
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java:
##########
@@ -406,36 +404,27 @@ private static boolean useNewTimerTagEncoding(TimerData
timerData) {
*/
public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData
timerData) {
String tagString;
- ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream();
- try {
- if (useNewTimerTagEncoding(timerData)) {
- tagString =
- new StringBuilder()
- .append(prefix.byteString().toStringUtf8()) // this never ends
with a slash
- .append(
- timerData.getNamespace().stringKey()) // this must begin
and end with a slash
- .append('+')
- .append(timerData.getTimerId()) // this is arbitrary;
currently unescaped
- .append('+')
- .append(timerData.getTimerFamilyId())
- .toString();
- out.write(tagString.getBytes(StandardCharsets.UTF_8));
- } else {
- // Timers without timerFamily would have timerFamily would be an empty
string
- tagString =
- new StringBuilder()
- .append(prefix.byteString().toStringUtf8()) // this never ends
with a slash
- .append(
- timerData.getNamespace().stringKey()) // this must begin
and end with a slash
- .append('+')
- .append(timerData.getTimerId()) // this is arbitrary;
currently unescaped
- .toString();
- out.write(tagString.getBytes(StandardCharsets.UTF_8));
- }
- return ByteString.readFrom(new
ExposedByteArrayInputStream(out.toByteArray()));
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (useNewTimerTagEncoding(timerData)) {
+ tagString =
+ new StringBuilder()
+ .append(prefix.byteString().toStringUtf8()) // this never ends
with a slash
+ .append(timerData.getNamespace().stringKey()) // this must begin
and end with a slash
+ .append('+')
+ .append(timerData.getTimerId()) // this is arbitrary; currently
unescaped
+ .append('+')
+ .append(timerData.getTimerFamilyId())
+ .toString();
+ } else {
+ // Timers without timerFamily would have timerFamily would be an empty
string
+ tagString =
+ new StringBuilder()
+ .append(prefix.byteString().toStringUtf8()) // this never ends
with a slash
+ .append(timerData.getNamespace().stringKey()) // this must begin
and end with a slash
+ .append('+')
+ .append(timerData.getTimerId()) // this is arbitrary; currently
unescaped
+ .toString();
}
+ return ByteString.copyFromUtf8(tagString);
Review Comment:
There is also beam's ByteStringOutputStream but given that these are short
I'm guessing this could be faster.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -332,7 +330,9 @@ private synchronized ImmutableMap<ShardedKey, WorkId>
getStuckCommitsAt(
* {@link ActiveWorkState}, and not committed back to Windmill.
*/
GetWorkBudget currentActiveWorkBudget() {
- return activeGetWorkBudget.get();
+ synchronized (this) {
+ return activeGetWorkBudget;
Review Comment:
just make method synchronized
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]