[ 
https://issues.apache.org/jira/browse/BEAM-4399?focusedWorklogId=111849&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111849
 ]

ASF GitHub Bot logged work on BEAM-4399:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jun/18 11:56
            Start Date: 14/Jun/18 11:56
    Worklog Time Spent: 10m 
      Work Description: lgajowy commented on a change in pull request #5632: 
[BEAM-4399] Change CassandraIOIT to write-then-read
URL: https://github.com/apache/beam/pull/5632#discussion_r195394205
 
 

 ##########
 File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
 ##########
 @@ -70,159 +65,166 @@
 @RunWith(JUnit4.class)
 public class CassandraIOIT implements Serializable {
 
-  private static IOTestPipelineOptions options;
+  /** CassandraIOIT options. */
+  public interface CassandraIOITOptions extends IOTestPipelineOptions {
+    /* Cassandra */
+    @org.apache.beam.sdk.options.Description("Host for Cassandra server (host 
name/ip address)")
+    List<String> getCassandraHost();
+
+    void setCassandraHost(List<String> host);
+
+    @org.apache.beam.sdk.options.Description("Port for Cassandra server")
+    @Default.Integer(9042)
+    Integer getCassandraPort();
+
+    void setCassandraPort(Integer port);
+  }
 
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  private static final Logger LOG = 
LoggerFactory.getLogger(CassandraIOIT.class);
+
+  private static CassandraIOITOptions options;
+  private static final String KEYSPACE = "BEAM";
+  private static final String TABLE = "BEAM_TEST";
+
+  @Rule public transient TestPipeline pipelineWrite = TestPipeline.create();
+  @Rule public transient TestPipeline pipelineRead = TestPipeline.create();
 
   @BeforeClass
   public static void setup() {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    options = TestPipeline.testingPipelineOptions()
-        .as(IOTestPipelineOptions.class);
+    PipelineOptionsFactory.register(CassandraIOITOptions.class);
+    options = 
TestPipeline.testingPipelineOptions().as(CassandraIOITOptions.class);
+
+    dropTable(options, KEYSPACE, TABLE);
+    createTable(options, KEYSPACE, TABLE);
   }
 
   @AfterClass
   public static void tearDown() {
-    // cleanup the write table
-    CassandraTestDataSet.cleanUpDataTable(options);
+    dropTable(options, KEYSPACE, TABLE);
   }
 
+  /** Tests writing then reading data for a HBase database. */
   @Test
-  public void testRead() {
-    PCollection<Scientist> output = 
pipeline.apply(CassandraIO.<Scientist>read()
-        .withHosts(Collections.singletonList(options.getCassandraHost()))
-        .withPort(options.getCassandraPort())
-        .withMinNumberOfSplits(20)
-        .withKeyspace(CassandraTestDataSet.KEYSPACE)
-        .withTable(CassandraTestDataSet.TABLE_READ_NAME)
-        .withEntity(Scientist.class)
-        .withCoder(SerializableCoder.of(Scientist.class)));
-
-    PAssert.thatSingleton(output.apply("Count scientist", 
Count.globally())).isEqualTo(1000L);
-
-    PCollection<KV<String, Integer>> mapped =
-        output.apply(
-            MapElements.via(
-                new SimpleFunction<Scientist, KV<String, Integer>>() {
-                  @Override
-                  public KV<String, Integer> apply(Scientist scientist) {
-                    return KV.of(scientist.name, scientist.id);
-                  }
-                }
-            )
-        );
-    PAssert.that(mapped.apply("Count occurrences per scientist", 
Count.perKey()))
-        .satisfies(
-            input -> {
-              for (KV<String, Long> element : input) {
-                assertEquals(element.getKey(), 1000 / 10, 
element.getValue().longValue());
-              }
-              return null;
-            });
-
-    pipeline.run().waitUntilFinish();
+  public void testWriteThenRead() {
+    runWrite();
+    runRead();
   }
 
-  @Test
-  public void testWrite() {
-    IOTestPipelineOptions options =
-        TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
-
-    options.setOnSuccessMatcher(
-        new CassandraMatcher(
-            CassandraTestDataSet.getCluster(options),
-            CassandraTestDataSet.TABLE_WRITE_NAME));
-
-    ArrayList<ScientistForWrite> data = new ArrayList<>();
-    for (int i = 0; i < 1000; i++) {
-      ScientistForWrite scientist = new ScientistForWrite();
-      scientist.id = i;
-      scientist.name = "Name " + i;
-      data.add(scientist);
-    }
-
-    pipeline
-        .apply(Create.of(data))
-        .apply(CassandraIO.<ScientistForWrite>write()
-            .withHosts(Collections.singletonList(options.getCassandraHost()))
-            .withPort(options.getCassandraPort())
-            .withKeyspace(CassandraTestDataSet.KEYSPACE)
-            .withEntity(ScientistForWrite.class));
-
-    pipeline.run().waitUntilFinish();
+  private void runWrite() {
+    pipelineWrite
+        .apply("GenSequence", GenerateSequence.from(0).to((long) 
options.getNumberOfRecords()))
+        .apply("PrepareTestRows", ParDo.of(new 
TestRow.DeterministicallyConstructTestRowFn()))
+        .apply("MapToEntity", ParDo.of(new CreateScientistFn()))
+        .apply(
+            "WriteToCassandra",
+            CassandraIO.<Scientist>write()
+                .withHosts(options.getCassandraHost())
+                .withPort(options.getCassandraPort())
+                .withKeyspace(KEYSPACE)
+                .withEntity(Scientist.class));
+
+    pipelineWrite.run().waitUntilFinish();
   }
 
-  /**
-   * Simple matcher.
-   */
-  static class CassandraMatcher extends TypeSafeMatcher<PipelineResult>
-      implements SerializableMatcher<PipelineResult> {
+  private void runRead() {
+    PCollection<Scientist> output =
+        pipelineRead.apply(
+            CassandraIO.<Scientist>read()
+                .withHosts(options.getCassandraHost())
+                .withPort(options.getCassandraPort())
+                .withMinNumberOfSplits(20)
+                .withKeyspace(KEYSPACE)
+                .withTable(TABLE)
+                .withEntity(Scientist.class)
+                .withCoder(SerializableCoder.of(Scientist.class)));
+
+    PAssert.thatSingleton(output.apply("Count All", Count.globally()))
+        .isEqualTo((long) options.getNumberOfRecords());
+
+    PCollection<String> consolidatedHashcode =
+        output
+            .apply(ParDo.of(new SelectNameFn()))
+            .apply("Hash row contents", Combine.globally(new 
HashingFn()).withoutDefaults());
+
+    PAssert.that(consolidatedHashcode)
 
 Review comment:
   nit: Could we use 
`PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);` instead? 
It will produce better error message.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 111849)
    Time Spent: 2h 10m  (was: 2h)

> Change CassandraIOIT to write-then-read Performance Tests
> ---------------------------------------------------------
>
>                 Key: BEAM-4399
>                 URL: https://issues.apache.org/jira/browse/BEAM-4399
>             Project: Beam
>          Issue Type: Sub-task
>          Components: testing
>            Reporter: Łukasz Gajowy
>            Assignee: Ismaël Mejía
>            Priority: Minor
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> CassandraIOIT is different than other IOITs (such as JdbcIOIT or MongodbIOIT) 
> and does not fulfil the rules described in the documentation: 
> [https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests].
>  
> We should make it coherent with other tests, more specifically: 
>  - write it in writeThenReadAll style
>  - enable running it with Perfkit
>  - provide Jenkins job to run it periodically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to