This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a43678  Dataflow Worker: Add worker_id to windmill request headers 
(#6663)
6a43678 is described below

commit 6a43678b150186558eefd6bfd806247ac208a4ae
Author: Raghu Angadi <rang...@apache.org>
AuthorDate: Thu Nov 8 10:14:42 2018 -0800

    Dataflow Worker: Add worker_id to windmill request headers (#6663)
---
 .../beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java | 1 +
 .../runners/dataflow/worker/windmill/GrpcWindmillServerTest.java  | 8 +++++++-
 .../worker/windmill/src/main/proto/windmill.proto                 | 2 ++
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 25b2d78..b7e2c69 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -533,6 +533,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
     return JobHeader.newBuilder()
         .setJobId(options.getJobId())
         .setProjectId(options.getProject())
+        .setWorkerId(options.getWorkerId())
         .build();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
index 19428e2..0e886e6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java
@@ -299,6 +299,7 @@ public class GrpcWindmillServerTest {
                             JobHeader.newBuilder()
                                 .setJobId("job")
                                 .setProjectId("project")
+                                .setWorkerId("worker")
                                 .build()));
                     sawHeader = true;
                   } else {
@@ -522,7 +523,11 @@ public class GrpcWindmillServerTest {
                   errorCollector.checkThat(
                       request.getHeader(),
                       Matchers.equalTo(
-                          
JobHeader.newBuilder().setJobId("job").setProjectId("project").build()));
+                          JobHeader.newBuilder()
+                              .setJobId("job")
+                              .setProjectId("project")
+                              .setWorkerId("worker")
+                              .build()));
                   sawHeader = true;
                   LOG.info("Received header");
                 } else {
@@ -649,6 +654,7 @@ public class GrpcWindmillServerTest {
                             JobHeader.newBuilder()
                                 .setJobId("job")
                                 .setProjectId("project")
+                                .setWorkerId("worker")
                                 .build()));
                     sawHeader = true;
                   } else {
diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index f96d3fe..d56bb12 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -446,6 +446,8 @@ message StreamingCommitWorkRequest {
 message JobHeader {
   optional string job_id = 1;
   optional string project_id = 2;
+  // Worker id is meant for logging only. Do not rely on it for other 
decisions.
+  optional string worker_id = 3;
 }
 
 message StreamingCommitRequestChunk {

Reply via email to