[ 
https://issues.apache.org/jira/browse/BEAM-4399?focusedWorklogId=112792&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112792
 ]

ASF GitHub Bot logged work on BEAM-4399:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jun/18 15:50
            Start Date: 18/Jun/18 15:50
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5632: [BEAM-4399] Change 
CassandraIOIT to write-then-read
URL: https://github.com/apache/beam/pull/5632
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/cassandra/build.gradle 
b/sdks/java/io/cassandra/build.gradle
index 1729233383d..7b5d00a84cb 100644
--- a/sdks/java/io/cassandra/build.gradle
+++ b/sdks/java/io/cassandra/build.gradle
@@ -24,7 +24,7 @@ enableJavaPerformanceTesting()
 description = "Apache Beam :: SDKs :: Java :: IO :: Cassandra"
 ext.summary = "IO to read and write with Apache Cassandra database"
 
-def cassandra_version = "3.4.0"
+def cassandra_version = "3.5.0"
 
 dependencies {
   compile library.java.guava
diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml
index 4b1f9f76729..04381ca2b57 100644
--- a/sdks/java/io/cassandra/pom.xml
+++ b/sdks/java/io/cassandra/pom.xml
@@ -31,7 +31,7 @@
   <description>IO to read and write with Apache Cassandra 
database</description>
 
   <properties>
-    <cassandra.driver.version>3.4.0</cassandra.driver.version>
+    <cassandra.driver.version>3.5.0</cassandra.driver.version>
   </properties>
 
   <dependencies>
diff --git 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
index 191201a5f7d..7e5ce379cd6 100644
--- 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
+++ 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
@@ -17,40 +17,36 @@
  */
 package org.apache.beam.sdk.io.cassandra;
 
-import static org.junit.Assert.assertEquals;
-
 import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.mapping.annotations.Column;
 import com.datastax.driver.mapping.annotations.PartitionKey;
 import com.datastax.driver.mapping.annotations.Table;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOITHelper;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SerializableMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeMatcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A test of {@link CassandraIO} on a concrete and independent Cassandra 
instance.
@@ -62,7 +58,9 @@
  * <pre>{@code
  * ./gradlew integrationTest -p sdks/java/io/cassandra 
-DintegrationTestPipelineOptions='[
  * "--cassandraHost=1.2.3.4",
- * "--cassandraPort=9042"]'
+ * "--cassandraPort=9042"
+ * "--numberOfRecords=1000"
+ * ]'
  * --tests org.apache.beam.sdk.io.cassandra.CassandraIOIT
  * -DintegrationTestRunner=direct
  * }</pre>
@@ -70,159 +68,162 @@
 @RunWith(JUnit4.class)
 public class CassandraIOIT implements Serializable {
 
-  private static IOTestPipelineOptions options;
+  /** CassandraIOIT options. */
+  public interface CassandraIOITOptions extends IOTestPipelineOptions {
+    @Description("Host for Cassandra server (host name/ip address)")
+    @Validation.Required
+    List<String> getCassandraHost();
+
+    void setCassandraHost(List<String> host);
+
+    @Description("Port for Cassandra server")
+    @Default.Integer(9042)
+    Integer getCassandraPort();
+
+    void setCassandraPort(Integer port);
+  }
 
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  private static final Logger LOG = 
LoggerFactory.getLogger(CassandraIOIT.class);
+
+  private static CassandraIOITOptions options;
+  private static final String KEYSPACE = "BEAM";
+  private static final String TABLE = "BEAM_TEST";
+
+  @Rule public transient TestPipeline pipelineWrite = TestPipeline.create();
+  @Rule public transient TestPipeline pipelineRead = TestPipeline.create();
 
   @BeforeClass
   public static void setup() {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    options = TestPipeline.testingPipelineOptions()
-        .as(IOTestPipelineOptions.class);
+    options = IOITHelper.readIOTestPipelineOptions(CassandraIOITOptions.class);
+
+    dropTable(options, KEYSPACE, TABLE);
+    createTable(options, KEYSPACE, TABLE);
   }
 
   @AfterClass
   public static void tearDown() {
-    // cleanup the write table
-    CassandraTestDataSet.cleanUpDataTable(options);
+    dropTable(options, KEYSPACE, TABLE);
   }
 
+  /** Tests writing then reading data for a HBase database. */
   @Test
-  public void testRead() {
-    PCollection<Scientist> output = 
pipeline.apply(CassandraIO.<Scientist>read()
-        .withHosts(Collections.singletonList(options.getCassandraHost()))
-        .withPort(options.getCassandraPort())
-        .withMinNumberOfSplits(20)
-        .withKeyspace(CassandraTestDataSet.KEYSPACE)
-        .withTable(CassandraTestDataSet.TABLE_READ_NAME)
-        .withEntity(Scientist.class)
-        .withCoder(SerializableCoder.of(Scientist.class)));
-
-    PAssert.thatSingleton(output.apply("Count scientist", 
Count.globally())).isEqualTo(1000L);
-
-    PCollection<KV<String, Integer>> mapped =
-        output.apply(
-            MapElements.via(
-                new SimpleFunction<Scientist, KV<String, Integer>>() {
-                  @Override
-                  public KV<String, Integer> apply(Scientist scientist) {
-                    return KV.of(scientist.name, scientist.id);
-                  }
-                }
-            )
-        );
-    PAssert.that(mapped.apply("Count occurrences per scientist", 
Count.perKey()))
-        .satisfies(
-            input -> {
-              for (KV<String, Long> element : input) {
-                assertEquals(element.getKey(), 1000 / 10, 
element.getValue().longValue());
-              }
-              return null;
-            });
-
-    pipeline.run().waitUntilFinish();
+  public void testWriteThenRead() {
+    runWrite();
+    runRead();
   }
 
-  @Test
-  public void testWrite() {
-    IOTestPipelineOptions options =
-        TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
-
-    options.setOnSuccessMatcher(
-        new CassandraMatcher(
-            CassandraTestDataSet.getCluster(options),
-            CassandraTestDataSet.TABLE_WRITE_NAME));
-
-    ArrayList<ScientistForWrite> data = new ArrayList<>();
-    for (int i = 0; i < 1000; i++) {
-      ScientistForWrite scientist = new ScientistForWrite();
-      scientist.id = i;
-      scientist.name = "Name " + i;
-      data.add(scientist);
-    }
-
-    pipeline
-        .apply(Create.of(data))
-        .apply(CassandraIO.<ScientistForWrite>write()
-            .withHosts(Collections.singletonList(options.getCassandraHost()))
-            .withPort(options.getCassandraPort())
-            .withKeyspace(CassandraTestDataSet.KEYSPACE)
-            .withEntity(ScientistForWrite.class));
-
-    pipeline.run().waitUntilFinish();
+  private void runWrite() {
+    pipelineWrite
+        .apply("GenSequence", GenerateSequence.from(0).to((long) 
options.getNumberOfRecords()))
+        .apply("PrepareTestRows", ParDo.of(new 
TestRow.DeterministicallyConstructTestRowFn()))
+        .apply("MapToEntity", ParDo.of(new CreateScientistFn()))
+        .apply(
+            "WriteToCassandra",
+            CassandraIO.<Scientist>write()
+                .withHosts(options.getCassandraHost())
+                .withPort(options.getCassandraPort())
+                .withKeyspace(KEYSPACE)
+                .withEntity(Scientist.class));
+
+    pipelineWrite.run().waitUntilFinish();
   }
 
-  /**
-   * Simple matcher.
-   */
-  static class CassandraMatcher extends TypeSafeMatcher<PipelineResult>
-      implements SerializableMatcher<PipelineResult> {
+  private void runRead() {
+    PCollection<Scientist> output =
+        pipelineRead.apply(
+            CassandraIO.<Scientist>read()
+                .withHosts(options.getCassandraHost())
+                .withPort(options.getCassandraPort())
+                .withMinNumberOfSplits(20)
+                .withKeyspace(KEYSPACE)
+                .withTable(TABLE)
+                .withEntity(Scientist.class)
+                .withCoder(SerializableCoder.of(Scientist.class)));
+
+    PCollection<String> consolidatedHashcode =
+        output
+            .apply(ParDo.of(new SelectNameFn()))
+            .apply("Hash row contents", Combine.globally(new 
HashingFn()).withoutDefaults());
+
+    PAssert.thatSingleton(consolidatedHashcode)
+        
.isEqualTo(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));
+
+    pipelineRead.run().waitUntilFinish();
+  }
 
-    private final String tableName;
-    private final Cluster cluster;
+  private static Cluster getCluster(CassandraIOITOptions options) {
+    return Cluster.builder()
+        .addContactPoints(options.getCassandraHost().toArray(new String[0]))
+        .withPort(options.getCassandraPort())
+        .build();
+  }
 
-    CassandraMatcher(Cluster cluster, String tableName) {
-      this.cluster = cluster;
-      this.tableName = tableName;
-    }
-
-    @Override
-    protected boolean matchesSafely(PipelineResult pipelineResult) {
-      pipelineResult.waitUntilFinish();
-      Session session = cluster.connect();
-      ResultSet result = session.execute("select id,name from " + 
CassandraTestDataSet.KEYSPACE
-          + "." + tableName);
-      List<Row> rows = result.all();
-      if (rows.size() != 1000) {
-        return false;
-      }
-      for (Row row : rows) {
-        if (!row.getString("name").matches("Name.*")) {
-          return false;
-        }
-      }
-      return true;
+  private static void createTable(CassandraIOITOptions options, String 
keyspace, String tableName) {
+    try (Cluster cluster = getCluster(options);
+        Session session = cluster.connect()) {
+      LOG.info("Create {} keyspace if not exists", keyspace);
+      session.execute(
+          "CREATE KEYSPACE IF NOT EXISTS "
+              + KEYSPACE
+              + " WITH REPLICATION = "
+              + "{'class':'SimpleStrategy', 'replication_factor':3};");
+
+      session.execute("USE " + keyspace);
+
+      LOG.info("Create {} table if not exists", tableName);
+      session.execute(
+          "CREATE TABLE IF NOT EXISTS "
+              + tableName
+              + "(id bigint, name text, PRIMARY "
+              + "KEY(id))");
     }
+  }
 
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("Expected Cassandra record pattern is (Name.*)");
+  private static void dropTable(CassandraIOITOptions options, String keyspace, 
String table) {
+    try (Cluster cluster = getCluster(options);
+        Session session = cluster.connect()) {
+      session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
+      session.execute("DROP TABLE IF EXISTS " + keyspace + "." + table);
     }
   }
 
-  /**
-   * Simple Cassandra entity representing a scientist. Used for read test.
-   */
-  @Table(name = CassandraTestDataSet.TABLE_READ_NAME, keyspace = 
CassandraTestDataSet.KEYSPACE)
-  static class Scientist implements Serializable {
+  /** Simple Cassandra entity representing a scientist. Used for read test. */
+  @Table(name = TABLE, keyspace = KEYSPACE)
+  private static final class Scientist implements Serializable {
     @PartitionKey
     @Column(name = "id")
-    final int id;
+    final long id;
 
     @Column(name = "name")
     final String name;
 
-    Scientist(int id, String name) {
+    Scientist() {
+      // Empty constructor needed for deserialization from Cassandra
+      this(0, null);
+    }
+
+    Scientist(long id, String name) {
       this.id = id;
       this.name = name;
     }
-  }
-
-  /**
-   * Simple Cassandra entity representing a scientist, used for write test.
-   */
-  @Table(name = CassandraTestDataSet.TABLE_WRITE_NAME, keyspace = 
CassandraTestDataSet.KEYSPACE)
-  static class ScientistForWrite implements Serializable {
-    @PartitionKey
-    @Column(name = "id")
-    Integer id;
-
-    @Column(name = "name")
-    String name;
 
     @Override
     public String toString() {
-      return id + ":" + name;
+      return id + ": " + name;
+    }
+  }
+
+  private static class CreateScientistFn extends DoFn<TestRow, Scientist> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(new Scientist(c.element().id(), c.element().name()));
+    }
+  }
+
+  private static class SelectNameFn extends DoFn<Scientist, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().name);
     }
   }
 }
diff --git 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
deleted file mode 100644
index 2384bbc718c..00000000000
--- 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraTestDataSet.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.cassandra;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manipulates test data used by the {@link CassandraIO} tests.
- *
- * <p>This is independent from the tests so that for read tests it can be run 
separately after
- * data store creation rather than every time (which can be more fragile).
- */
-public class CassandraTestDataSet {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(CassandraTestDataSet.class);
-
-  /**
-   * Use this to create the read tables before IT read tests.
-   *
-   * <p>To invoke this class, you can use this command line:
-   * (run from the cassandra root directory)
-   * mvn test-compile exec:java 
-Dexec.mainClass=org.apache.beam.sdk.io.cassandra
-   * .CassandraTestDataSet \
-   *   -Dexec.args="--cassandraHost=localhost --cassandraPort=9042 \
-   *   -Dexec.classpathScope=test
-   * @param args Please pass options from IOTestPipelineOptions used for 
connection to Cassandra as
-   * shown above.
-   */
-  public static void main(String[] args) {
-    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
-    IOTestPipelineOptions options =
-        PipelineOptionsFactory.fromArgs(args).as(IOTestPipelineOptions.class);
-
-    createDataTable(options);
-  }
-
-  public static final String KEYSPACE = "BEAM";
-  public static final String TABLE_READ_NAME = "BEAM_READ_TEST";
-  public static final String TABLE_WRITE_NAME = "BEAM_WRITE_TEST";
-
-  private static void createDataTable(IOTestPipelineOptions options) {
-    createTable(options, TABLE_READ_NAME);
-    insertTestData(options, TABLE_READ_NAME);
-    createTable(options, TABLE_WRITE_NAME);
-  }
-
-  public static Cluster getCluster(IOTestPipelineOptions options) {
-    return Cluster.builder()
-        .addContactPoint(options.getCassandraHost())
-        .withPort(options.getCassandraPort())
-        .build();
-  }
-
-  private static void createTable(IOTestPipelineOptions options, String 
tableName) {
-    try (Cluster cluster = getCluster(options); Session session = 
cluster.connect()) {
-      LOG.info("Create {} keyspace if not exists", KEYSPACE);
-      session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH 
REPLICATION = "
-              + "{'class':'SimpleStrategy', 'replication_factor':3};");
-
-      session.execute("USE " + KEYSPACE);
-
-      LOG.info("Create {} table if not exists", tableName);
-      session.execute("CREATE TABLE IF NOT EXISTS " + tableName + "(id int, 
name text, PRIMARY "
-              + "KEY(id))");
-    }
-  }
-
-  private static void insertTestData(IOTestPipelineOptions options, String 
tableName) {
-    try (Cluster cluster = getCluster(options); Session session = 
cluster.connect()) {
-      LOG.info("Insert test dataset");
-      String[] scientists = {
-        "Lovelace",
-        "Franklin",
-        "Meitner",
-        "Hopper",
-        "Curie",
-        "Faraday",
-        "Newton",
-        "Bohr",
-        "Galilei",
-        "Maxwell"
-      };
-      for (int i = 0; i < 1000; i++) {
-        int index = i % scientists.length;
-        session.execute("INSERT INTO " + KEYSPACE + "." + tableName + "(id, 
name) values("
-                + i + ",'" + scientists[index] + "');");
-      }
-    }
-  }
-
-  public static void cleanUpDataTable(IOTestPipelineOptions options) {
-    try (Cluster cluster = getCluster(options); Session session = 
cluster.connect()) {
-      session.execute("TRUNCATE TABLE " + KEYSPACE + "." + TABLE_WRITE_NAME);
-    }
-  }
-
-}
diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index ab23ca40cd9..f54e62f1b56 100644
--- 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -44,17 +44,6 @@
   String getSolrZookeeperServer();
   void setSolrZookeeperServer(String value);
 
-  /* Cassandra */
-  @Description("Host for Cassandra server (host name/ip address)")
-  @Default.String("cassandra-host")
-  String getCassandraHost();
-  void setCassandraHost(String host);
-
-  @Description("Port for Cassandra server")
-  @Default.Integer(7001)
-  Integer getCassandraPort();
-  void setCassandraPort(Integer port);
-
   /* Used by most IOIT */
   @Description("Number records that will be written and read by the test")
   @Default.Integer(100000)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 112792)
    Time Spent: 4h 10m  (was: 4h)

> Change CassandraIOIT to write-then-read Performance Tests
> ---------------------------------------------------------
>
>                 Key: BEAM-4399
>                 URL: https://issues.apache.org/jira/browse/BEAM-4399
>             Project: Beam
>          Issue Type: Sub-task
>          Components: testing
>            Reporter: Łukasz Gajowy
>            Assignee: Ismaël Mejía
>            Priority: Minor
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> CassandraIOIT is different than other IOITs (such as JdbcIOIT or MongodbIOIT) 
> and does not fulfil the rules described in the documentation: 
> [https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests].
>  
> We should make it coherent with other tests, more specifically: 
>  - write it in writeThenReadAll style
>  - enable running it with Perfkit
>  - provide Jenkins job to run it periodically



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to