[ 
https://issues.apache.org/jira/browse/BEAM-4399?focusedWorklogId=111847&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111847
 ]

ASF GitHub Bot logged work on BEAM-4399:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Jun/18 11:55
            Start Date: 14/Jun/18 11:55
    Worklog Time Spent: 10m 
      Work Description: lgajowy commented on a change in pull request #5632: 
[BEAM-4399] Change CassandraIOIT to write-then-read
URL: https://github.com/apache/beam/pull/5632#discussion_r195394016
 
 

 ##########
 File path: 
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
 ##########
 @@ -70,159 +65,166 @@
 @RunWith(JUnit4.class)
 public class CassandraIOIT implements Serializable {
 
-  private static IOTestPipelineOptions options;
+  /** CassandraIOIT options. */
+  public interface CassandraIOITOptions extends IOTestPipelineOptions {
+    /* Cassandra */
+    @org.apache.beam.sdk.options.Description("Host for Cassandra server (host 
name/ip address)")
+    List<String> getCassandraHost();
+
+    void setCassandraHost(List<String> host);
+
+    @org.apache.beam.sdk.options.Description("Port for Cassandra server")
+    @Default.Integer(9042)
+    Integer getCassandraPort();
+
+    void setCassandraPort(Integer port);
+  }
 
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  private static final Logger LOG = 
LoggerFactory.getLogger(CassandraIOIT.class);
+
+  private static CassandraIOITOptions options;
+  private static final String KEYSPACE = "BEAM";
+  private static final String TABLE = "BEAM_TEST";
+
+  @Rule public transient TestPipeline pipelineWrite = TestPipeline.create();
+  @Rule public transient TestPipeline pipelineRead = TestPipeline.create();
 
   @BeforeClass
   public static void setup() {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    options = TestPipeline.testingPipelineOptions()
-        .as(IOTestPipelineOptions.class);
+    PipelineOptionsFactory.register(CassandraIOITOptions.class);
+    options = 
TestPipeline.testingPipelineOptions().as(CassandraIOITOptions.class);
+
+    dropTable(options, KEYSPACE, TABLE);
+    createTable(options, KEYSPACE, TABLE);
   }
 
   @AfterClass
   public static void tearDown() {
-    // cleanup the write table
-    CassandraTestDataSet.cleanUpDataTable(options);
+    dropTable(options, KEYSPACE, TABLE);
   }
 
+  /** Tests writing then reading data for a HBase database. */
   @Test
-  public void testRead() {
-    PCollection<Scientist> output = 
pipeline.apply(CassandraIO.<Scientist>read()
-        .withHosts(Collections.singletonList(options.getCassandraHost()))
-        .withPort(options.getCassandraPort())
-        .withMinNumberOfSplits(20)
-        .withKeyspace(CassandraTestDataSet.KEYSPACE)
-        .withTable(CassandraTestDataSet.TABLE_READ_NAME)
-        .withEntity(Scientist.class)
-        .withCoder(SerializableCoder.of(Scientist.class)));
-
-    PAssert.thatSingleton(output.apply("Count scientist", 
Count.globally())).isEqualTo(1000L);
-
-    PCollection<KV<String, Integer>> mapped =
-        output.apply(
-            MapElements.via(
-                new SimpleFunction<Scientist, KV<String, Integer>>() {
-                  @Override
-                  public KV<String, Integer> apply(Scientist scientist) {
-                    return KV.of(scientist.name, scientist.id);
-                  }
-                }
-            )
-        );
-    PAssert.that(mapped.apply("Count occurrences per scientist", 
Count.perKey()))
-        .satisfies(
-            input -> {
-              for (KV<String, Long> element : input) {
-                assertEquals(element.getKey(), 1000 / 10, 
element.getValue().longValue());
-              }
-              return null;
-            });
-
-    pipeline.run().waitUntilFinish();
+  public void testWriteThenRead() {
+    runWrite();
+    runRead();
   }
 
-  @Test
-  public void testWrite() {
-    IOTestPipelineOptions options =
-        TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
-
-    options.setOnSuccessMatcher(
-        new CassandraMatcher(
-            CassandraTestDataSet.getCluster(options),
-            CassandraTestDataSet.TABLE_WRITE_NAME));
-
-    ArrayList<ScientistForWrite> data = new ArrayList<>();
-    for (int i = 0; i < 1000; i++) {
-      ScientistForWrite scientist = new ScientistForWrite();
-      scientist.id = i;
-      scientist.name = "Name " + i;
-      data.add(scientist);
-    }
-
-    pipeline
-        .apply(Create.of(data))
-        .apply(CassandraIO.<ScientistForWrite>write()
-            .withHosts(Collections.singletonList(options.getCassandraHost()))
-            .withPort(options.getCassandraPort())
-            .withKeyspace(CassandraTestDataSet.KEYSPACE)
-            .withEntity(ScientistForWrite.class));
-
-    pipeline.run().waitUntilFinish();
+  private void runWrite() {
+    pipelineWrite
+        .apply("GenSequence", GenerateSequence.from(0).to((long) 
options.getNumberOfRecords()))
+        .apply("PrepareTestRows", ParDo.of(new 
TestRow.DeterministicallyConstructTestRowFn()))
+        .apply("MapToEntity", ParDo.of(new CreateScientistFn()))
+        .apply(
+            "WriteToCassandra",
+            CassandraIO.<Scientist>write()
+                .withHosts(options.getCassandraHost())
+                .withPort(options.getCassandraPort())
+                .withKeyspace(KEYSPACE)
+                .withEntity(Scientist.class));
+
+    pipelineWrite.run().waitUntilFinish();
   }
 
-  /**
-   * Simple matcher.
-   */
-  static class CassandraMatcher extends TypeSafeMatcher<PipelineResult>
-      implements SerializableMatcher<PipelineResult> {
+  private void runRead() {
+    PCollection<Scientist> output =
+        pipelineRead.apply(
+            CassandraIO.<Scientist>read()
+                .withHosts(options.getCassandraHost())
+                .withPort(options.getCassandraPort())
+                .withMinNumberOfSplits(20)
+                .withKeyspace(KEYSPACE)
+                .withTable(TABLE)
+                .withEntity(Scientist.class)
+                .withCoder(SerializableCoder.of(Scientist.class)));
+
+    PAssert.thatSingleton(output.apply("Count All", Count.globally()))
+        .isEqualTo((long) options.getNumberOfRecords());
+
+    PCollection<String> consolidatedHashcode =
+        output
+            .apply(ParDo.of(new SelectNameFn()))
+            .apply("Hash row contents", Combine.globally(new 
HashingFn()).withoutDefaults());
+
+    PAssert.that(consolidatedHashcode)
+        
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
+
+    pipelineRead.run().waitUntilFinish();
+  }
 
-    private final String tableName;
-    private final Cluster cluster;
+  private static Cluster getCluster(CassandraIOITOptions options) {
+    return Cluster.builder()
+        .addContactPoints(options.getCassandraHost().toArray(new String[0]))
+        .withPort(options.getCassandraPort())
+        .build();
+  }
 
-    CassandraMatcher(Cluster cluster, String tableName) {
-      this.cluster = cluster;
-      this.tableName = tableName;
-    }
-
-    @Override
-    protected boolean matchesSafely(PipelineResult pipelineResult) {
-      pipelineResult.waitUntilFinish();
-      Session session = cluster.connect();
-      ResultSet result = session.execute("select id,name from " + 
CassandraTestDataSet.KEYSPACE
-          + "." + tableName);
-      List<Row> rows = result.all();
-      if (rows.size() != 1000) {
-        return false;
-      }
-      for (Row row : rows) {
-        if (!row.getString("name").matches("Name.*")) {
-          return false;
-        }
-      }
-      return true;
+  private static void createTable(CassandraIOITOptions options, String 
keyspace, String tableName) {
+    try (Cluster cluster = getCluster(options);
+        Session session = cluster.connect()) {
+      LOG.info("Create {} keyspace if not exists", keyspace);
+      session.execute(
+          "CREATE KEYSPACE IF NOT EXISTS "
+              + KEYSPACE
+              + " WITH REPLICATION = "
+              + "{'class':'SimpleStrategy', 'replication_factor':3};");
+
+      session.execute("USE " + keyspace);
+
+      LOG.info("Create {} table if not exists", tableName);
+      session.execute(
+          "CREATE TABLE IF NOT EXISTS "
+              + tableName
+              + "(id bigint, name text, PRIMARY "
+              + "KEY(id))");
     }
+  }
 
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("Expected Cassandra record pattern is (Name.*)");
+  private static void dropTable(CassandraIOITOptions options, String keyspace, 
String table) {
+    try (Cluster cluster = getCluster(options);
+        Session session = cluster.connect()) {
+      session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
+      session.execute("DROP TABLE IF EXISTS " + keyspace + "." + table);
     }
   }
 
-  /**
-   * Simple Cassandra entity representing a scientist. Used for read test.
-   */
-  @Table(name = CassandraTestDataSet.TABLE_READ_NAME, keyspace = 
CassandraTestDataSet.KEYSPACE)
-  static class Scientist implements Serializable {
+  /** Simple Cassandra entity representing a scientist. Used for read test. */
+  @Table(name = TABLE, keyspace = KEYSPACE)
+  static final class Scientist implements Serializable {
 
 Review comment:
   forgot it in the review: I think this can be private (compiles, runs too).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 111847)
    Time Spent: 1h 50m  (was: 1h 40m)

> Change CassandraIOIT to write-then-read Performance Tests
> ---------------------------------------------------------
>
>                 Key: BEAM-4399
>                 URL: https://issues.apache.org/jira/browse/BEAM-4399
>             Project: Beam
>          Issue Type: Sub-task
>          Components: testing
>            Reporter: Łukasz Gajowy
>            Assignee: Ismaël Mejía
>            Priority: Minor
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> CassandraIOIT is different than other IOITs (such as JdbcIOIT or MongodbIOIT) 
> and does not fulfil the rules described in the documentation: 
> [https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests].
>  
> We should make it coherent with other tests, more specifically: 
>  - write it in writeThenReadAll style
>  - enable running it with Perfkit
>  - provide Jenkins job to run it periodically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to