vmarquez commented on a change in pull request #10546:
URL: https://github.com/apache/beam/pull/10546#discussion_r434304763
##########
File path:
sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
##########
@@ -480,66 +527,22 @@ public void testCustomMapperImplDelete() {
assertEquals(1, counter.intValue());
}
- @Test
- public void testSplit() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- CassandraIO.Read<Scientist> read =
- CassandraIO.<Scientist>read()
- .withHosts(Collections.singletonList(CASSANDRA_HOST))
- .withPort(cassandraPort)
- .withKeyspace(CASSANDRA_KEYSPACE)
- .withTable(CASSANDRA_TABLE)
- .withEntity(Scientist.class)
- .withCoder(SerializableCoder.of(Scientist.class));
-
- // initialSource will be read without splitting (which does not happen in
production)
- // so we need to provide splitQueries to avoid NPE in source.reader.start()
- String splitQuery = QueryBuilder.select().from(CASSANDRA_KEYSPACE,
CASSANDRA_TABLE).toString();
- CassandraIO.CassandraSource<Scientist> initialSource =
- new CassandraIO.CassandraSource<>(read,
Collections.singletonList(splitQuery));
- int desiredBundleSizeBytes = 2048;
- long estimatedSize = initialSource.getEstimatedSizeBytes(options);
- List<BoundedSource<Scientist>> splits =
initialSource.split(desiredBundleSizeBytes, options);
- SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits,
options);
- float expectedNumSplitsloat =
- (float) initialSource.getEstimatedSizeBytes(options) /
desiredBundleSizeBytes;
- long sum = 0;
-
- for (BoundedSource<Scientist> subSource : splits) {
- sum += subSource.getEstimatedSizeBytes(options);
- }
-
- // due to division and cast estimateSize != sum but will be close. Exact
equals checked below
- assertEquals((long) (estimatedSize / splits.size()) * splits.size(), sum);
-
- int expectedNumSplits = (int) Math.ceil(expectedNumSplitsloat);
- assertEquals("Wrong number of splits", expectedNumSplits, splits.size());
- int emptySplits = 0;
- for (BoundedSource<Scientist> subSource : splits) {
- if (readFromSource(subSource, options).isEmpty()) {
- emptySplits += 1;
- }
- }
- assertThat(
- "There are too many empty splits, parallelism is sub-optimal",
- emptySplits,
- lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
- }
-
private List<Row> getRows(String table) {
ResultSet result =
session.execute(
String.format("select person_id,person_name from %s.%s",
CASSANDRA_KEYSPACE, table));
return result.all();
}
+ // TEMP TEST
Review comment:
Ooops I think this was left in from when I was experimenting with
something, I will remove.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]