ihji commented on a change in pull request #15767:
URL: https://github.com/apache/beam/pull/15767#discussion_r793139264



##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
##########
@@ -83,62 +84,198 @@
  * 
(https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA";)
 for further
  * details.
  */
-@RunWith(JUnit4.class)
-public class ValidateRunnerXlangTest implements Serializable {
-  @Rule public transient TestPipeline testPipeline = TestPipeline.create();
-  private PipelineResult pipelineResult;
+public class ValidateRunnerXlangTest {
+  static class ValidateRunnerXlangTestBase implements Serializable {
+    @Rule public transient TestPipeline testPipeline = TestPipeline.create();
+    private PipelineResult pipelineResult;
 
-  // URNs for core cross-language transforms.
-  // See 
https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA 
for further
-  // details.
-  private static final String TEST_PREFIX_URN = 
"beam:transforms:xlang:test:prefix";
-  private static final String TEST_MULTI_URN = 
"beam:transforms:xlang:test:multi";
-  private static final String TEST_GBK_URN = "beam:transforms:xlang:test:gbk";
-  private static final String TEST_CGBK_URN = 
"beam:transforms:xlang:test:cgbk";
-  private static final String TEST_COMGL_URN = 
"beam:transforms:xlang:test:comgl";
-  private static final String TEST_COMPK_URN = 
"beam:transforms:xlang:test:compk";
-  private static final String TEST_FLATTEN_URN = 
"beam:transforms:xlang:test:flatten";
-  private static final String TEST_PARTITION_URN = 
"beam:transforms:xlang:test:partition";
-  private static final String TEST_PYTHON_BS4_URN = 
"beam:transforms:xlang:test:python_bs4";
+    // URNs for core cross-language transforms.
+    // See 
https://docs.google.com/document/d/1xQp0ElIV84b8OCVz8CD2hvbiWdR8w4BvWxPTZJZA6NA 
for
+    // further
+    // details.
+    private static final String TEST_PREFIX_URN = 
"beam:transforms:xlang:test:prefix";
+    private static final String TEST_MULTI_URN = 
"beam:transforms:xlang:test:multi";
+    private static final String TEST_GBK_URN = 
"beam:transforms:xlang:test:gbk";
+    private static final String TEST_CGBK_URN = 
"beam:transforms:xlang:test:cgbk";
+    private static final String TEST_COMGL_URN = 
"beam:transforms:xlang:test:comgl";
+    private static final String TEST_COMPK_URN = 
"beam:transforms:xlang:test:compk";
+    private static final String TEST_FLATTEN_URN = 
"beam:transforms:xlang:test:flatten";
+    private static final String TEST_PARTITION_URN = 
"beam:transforms:xlang:test:partition";
+    private static final String TEST_PYTHON_BS4_URN = 
"beam:transforms:xlang:test:python_bs4";
 
-  private static String expansionAddr;
-  private static String expansionJar;
+    private static String expansionAddr;
+    private static String expansionJar;
 
-  @BeforeClass
-  public static void setUpClass() {
-    expansionAddr =
-        String.format("localhost:%s", 
Integer.valueOf(System.getProperty("expansionPort")));
-    expansionJar = System.getProperty("expansionJar");
-  }
+    @BeforeClass
+    public static void setUpClass() {
+      expansionAddr =
+          String.format("localhost:%s", 
Integer.valueOf(System.getProperty("expansionPort")));
+      expansionJar = System.getProperty("expansionJar");
+    }
 
-  @Before
-  public void setUp() {
-    ExperimentalOptions.addExperiment(
-        testPipeline.getOptions().as(ExperimentalOptions.class), 
"jar_packages=" + expansionJar);
-    waitForReady();
-  }
+    @Before
+    public void setUp() {
+      ExperimentalOptions.addExperiment(
+          testPipeline.getOptions().as(ExperimentalOptions.class), 
"jar_packages=" + expansionJar);
+      waitForReady();
+    }
 
-  @After
-  public void tearDown() {
-    pipelineResult = testPipeline.run();
-    pipelineResult.waitUntilFinish();
-    assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE));
-  }
+    @After
+    public void tearDown() {
+      pipelineResult = testPipeline.run();
+      pipelineResult.waitUntilFinish();
+      assertThat(pipelineResult.getState(), 
equalTo(PipelineResult.State.DONE));
+    }
 
-  private void waitForReady() {
-    try {
-      ManagedChannel channel = 
ManagedChannelBuilder.forTarget(expansionAddr).build();
-      ConnectivityState state = channel.getState(true);
-      for (int retry = 0; retry < 30 && state != ConnectivityState.READY; 
retry++) {
-        Thread.sleep(500);
-        state = channel.getState(true);
+    private void waitForReady() {
+      try {
+        ManagedChannel channel = 
ManagedChannelBuilder.forTarget(expansionAddr).build();
+        ConnectivityState state = channel.getState(true);
+        for (int retry = 0; retry < 30 && state != ConnectivityState.READY; 
retry++) {
+          Thread.sleep(500);
+          state = channel.getState(true);
+        }
+        channel.shutdownNow();
+      } catch (InterruptedException e) {
+        throw new RuntimeException("interrupted.");
       }
-      channel.shutdownNow();
-    } catch (InterruptedException e) {
-      throw new RuntimeException("interrupted.");
     }
-  }
 
+    private byte[] toStringPayloadBytes(String data) throws IOException {
+      Row configRow =
+          Row.withSchema(Schema.of(Field.of("data", FieldType.STRING)))
+              .withFieldValue("data", data)
+              .build();
+
+      ByteString.Output outputStream = ByteString.newOutput();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              
.setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), false))
+              .setPayload(outputStream.toByteString())
+              .build();
+      return payload.toByteArray();
+    }
+
+    protected void singleInputOutputTest(Pipeline pipeline) throws IOException 
{
+      PCollection<String> col =
+          pipeline
+              .apply(Create.of("1", "2", "3"))
+              .apply(External.of(TEST_PREFIX_URN, toStringPayloadBytes("0"), 
expansionAddr));
+      PAssert.that(col).containsInAnyOrder("01", "02", "03");
+    }
+
+    protected void multiInputOutputWithSideInputTest(Pipeline pipeline) {
+      PCollection<String> main1 = pipeline.apply("createMain1", Create.of("a", 
"bb"));
+      PCollection<String> main2 = pipeline.apply("createMain2", Create.of("x", 
"yy", "zzz"));
+      PCollection<String> side = pipeline.apply("createSide", Create.of("s"));
+      PCollectionTuple pTuple =
+          PCollectionTuple.of("main1", main1)
+              .and("main2", main2)
+              .and("side", side)
+              .apply(External.of(TEST_MULTI_URN, new byte[] {}, 
expansionAddr).withMultiOutputs());

Review comment:
       Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to