Repository: beam Updated Branches: refs/heads/master 8beea73c1 -> d255fa25f
Improve HadoopInputFormatIO DisplayData and Cassandra tests Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c08b7b17 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c08b7b17 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c08b7b17 Branch: refs/heads/master Commit: c08b7b1771481b77c94ec78a96db5b34fec29841 Parents: 8beea73 Author: Dipti Kulkarni <dipti_dkulka...@persistent.co.in> Authored: Mon Apr 10 15:43:12 2017 +0530 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Apr 11 10:20:15 2017 -0700 ---------------------------------------------------------------------- .../hadoop/inputformat/HadoopInputFormatIO.java | 33 +-- .../inputformat/HadoopInputFormatIOTest.java | 99 ++++++--- sdks/java/io/hadoop/jdk1.8-tests/.toDelete | 0 .../HIFIOWithEmbeddedCassandraTest.java | 214 +++++++++++++++++++ 4 files changed, 306 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java index 675f4bf..61dc1bf 100644 --- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java +++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java @@ -36,9 +36,7 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -360,20 +358,6 @@ public class HadoopInputFormatIO { + e.getMessage(), e); } } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - if (getConfiguration().getHadoopConfiguration() != null) { - Iterator<Entry<String, String>> configProperties = getConfiguration() - .getHadoopConfiguration().iterator(); - while (configProperties.hasNext()) { - Entry<String, String> property = configProperties.next(); - builder.addIfNotNull(DisplayData.item(property.getKey(), property.getValue()) - .withLabel(property.getKey())); - } - } - } } /** @@ -447,6 +431,23 @@ public class HadoopInputFormatIO { } @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + Configuration hadoopConfig = getConfiguration().getHadoopConfiguration(); + if (hadoopConfig != null) { + builder.addIfNotNull(DisplayData.item("mapreduce.job.inputformat.class", + hadoopConfig.get("mapreduce.job.inputformat.class")) + .withLabel("InputFormat Class")); + builder.addIfNotNull(DisplayData.item("key.class", + hadoopConfig.get("key.class")) + .withLabel("Key Class")); + builder.addIfNotNull(DisplayData.item("value.class", + hadoopConfig.get("value.class")) + .withLabel("Value Class")); + } + } + + @Override public List<BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception { // desiredBundleSizeBytes is not being considered as splitting based on this http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java index 2f2857b..3a4a99d 100644 --- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java @@ -21,9 +21,7 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.Map.Entry; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; @@ -157,24 +155,6 @@ public class HadoopInputFormatIOTest { } /** - * This test validates functionality of {@link HadoopInputFormatIO.Read#populateDisplayData() - * populateDisplayData()}. - */ - @Test - public void testReadDisplayData() { - HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read() - .withConfiguration(serConf.getHadoopConfiguration()) - .withKeyTranslation(myKeyTranslate) - .withValueTranslation(myValueTranslate); - DisplayData displayData = DisplayData.from(read); - Iterator<Entry<String, String>> propertyElement = serConf.getHadoopConfiguration().iterator(); - while (propertyElement.hasNext()) { - Entry<String, String> element = propertyElement.next(); - assertThat(displayData, hasDisplayItem(element.getKey(), element.getValue())); - } - } - - /** * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with * null configuration. {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()} * method checks configuration is null and throws exception if it is null. @@ -415,6 +395,32 @@ public class HadoopInputFormatIOTest { } /** + * This test validates functionality of + * {@link HadoopInputFormatIO.HadoopInputFormatBoundedSource#populateDisplayData() + * populateDisplayData()}. + */ + @Test + public void testReadDisplayData() { + HadoopInputFormatBoundedSource<Text, Employee> boundedSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + new SerializableSplit()); + DisplayData displayData = DisplayData.from(boundedSource); + assertThat( + displayData, + hasDisplayItem("mapreduce.job.inputformat.class", + serConf.getHadoopConfiguration().get("mapreduce.job.inputformat.class"))); + assertThat(displayData, + hasDisplayItem("key.class", serConf.getHadoopConfiguration().get("key.class"))); + assertThat(displayData, + hasDisplayItem("value.class", serConf.getHadoopConfiguration().get("value.class"))); + } + + /** * This test validates behavior of {@link HadoopInputFormatBoundedSource} if RecordReader object * creation fails. */ @@ -472,7 +478,8 @@ public class HadoopInputFormatIOTest { */ @Test public void testReadersStartWhenZeroRecords() throws Exception { - InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); + + InputFormat mockInputFormat = Mockito.mock(EmployeeInputFormat.class); EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class); Mockito.when( mockInputFormat.createRecordReader(Mockito.any(InputSplit.class), @@ -487,9 +494,11 @@ public class HadoopInputFormatIOTest { null, // No key translation required. null, // No value translation required. new SerializableSplit(mockInputSplit)); - BoundedReader<KV<Text, Employee>> boundedReader = boundedSource.createReader(p.getOptions()); - assertEquals(false, boundedReader.start()); - assertEquals(Double.valueOf(1), boundedReader.getFractionConsumed()); + boundedSource.setInputFormatObj(mockInputFormat); + BoundedReader<KV<Text, Employee>> reader = boundedSource.createReader(p.getOptions()); + assertEquals(false, reader.start()); + assertEquals(Double.valueOf(1), reader.getFractionConsumed()); + reader.close(); } /** @@ -546,6 +555,48 @@ public class HadoopInputFormatIOTest { assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray())); } +/** + * This test validates the method getFractionConsumed()- when a bad progress value is returned by + * the inputformat. + */ + @Test + public void testGetFractionConsumedForBadProgressValue() throws Exception { + InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); + EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class); + Mockito.when( + mockInputFormat.createRecordReader(Mockito.any(InputSplit.class), + Mockito.any(TaskAttemptContext.class))).thenReturn(mockReader); + Mockito.when(mockReader.nextKeyValue()).thenReturn(true); + // Set to a bad value , not in range of 0 to 1 + Mockito.when(mockReader.getProgress()).thenReturn(2.0F); + InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class); + HadoopInputFormatBoundedSource<Text, Employee> boundedSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + new SerializableSplit(mockInputSplit)); + boundedSource.setInputFormatObj(mockInputFormat); + BoundedReader<KV<Text, Employee>> reader = boundedSource.createReader(p.getOptions()); + assertEquals(Double.valueOf(0), reader.getFractionConsumed()); + boolean start = reader.start(); + assertEquals(true, start); + if (start) { + boolean advance = reader.advance(); + assertEquals(null, reader.getFractionConsumed()); + assertEquals(true, advance); + if (advance) { + advance = reader.advance(); + assertEquals(null, reader.getFractionConsumed()); + } + } + // Validate if getFractionConsumed() returns null after few number of reads as getProgress + // returns invalid value '2' which is not in the range of 0 to 1. + assertEquals(null, reader.getFractionConsumed()); + reader.close(); + } /** * This test validates that reader and its parent source reads the same records. */ http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/jdk1.8-tests/.toDelete ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/.toDelete b/sdks/java/io/hadoop/jdk1.8-tests/.toDelete new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/beam/blob/c08b7b17/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java new file mode 100644 index 0000000..97addcf --- /dev/null +++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithEmbeddedCassandraTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; + +import java.io.Serializable; + +import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests to validate HadoopInputFormatIO for embedded Cassandra instance. + */ +@RunWith(JUnit4.class) +public class HIFIOWithEmbeddedCassandraTest implements Serializable { + private static final long serialVersionUID = 1L; + private static final String CASSANDRA_KEYSPACE = "beamdb"; + private static final String CASSANDRA_HOST = "127.0.0.1"; + private static final String CASSANDRA_TABLE = "scientists"; + private static final String CASSANDRA_THRIFT_PORT_PROPERTY = "cassandra.input.thrift.port"; + private static final String CASSANDRA_THRIFT_ADDRESS_PROPERTY = "cassandra.input.thrift.address"; + private static final String CASSANDRA_PARTITIONER_CLASS_PROPERTY = + "cassandra.input.partitioner.class"; + private static final String CASSANDRA_PARTITIONER_CLASS_VALUE = "Murmur3Partitioner"; + private static final String CASSANDRA_KEYSPACE_PROPERTY = "cassandra.input.keyspace"; + private static final String CASSANDRA_COLUMNFAMILY_PROPERTY = "cassandra.input.columnfamily"; + private static final String CASSANDRA_PORT = "9061"; + private static transient Cluster cluster; + private static transient Session session; + private static final long TEST_DATA_ROW_COUNT = 10L; + private static EmbeddedCassandraService cassandra = new EmbeddedCassandraService(); + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + + /** + * Test to read data from embedded Cassandra instance and verify whether data is read + * successfully. + * @throws Exception + */ + @Test + public void testHIFReadForCassandra() throws Exception { + // Expected hashcode is evaluated during insertion time one time and hardcoded here. + String expectedHashCode = "4651110ba1ef2cd3a7315091ca27877b18fceb0e"; + Configuration conf = getConfiguration(); + PCollection<KV<Long, String>> cassandraData = + p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf) + .withValueTranslation(myValueTranslate)); + // Verify the count of data retrieved from Cassandra matches expected count. + PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally())) + .isEqualTo(TEST_DATA_ROW_COUNT); + PCollection<String> textValues = cassandraData.apply(Values.<String>create()); + // Verify the output values using checksum comparison. + PCollection<String> consolidatedHashcode = + textValues.apply(Combine.globally(new HashingFn()).withoutDefaults()); + PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode); + p.run().waitUntilFinish(); + } + + SimpleFunction<Row, String> myValueTranslate = new SimpleFunction<Row, String>() { + @Override + public String apply(Row input) { + String scientistRecord = input.getInt("id") + "|" + input.getString("scientist"); + return scientistRecord; + } + }; + + /** + * Test to read data from embedded Cassandra instance based on query and verify whether data is + * read successfully. + */ + @Test + public void testHIFReadForCassandraQuery() throws Exception { + Long expectedCount = 1L; + String expectedChecksum = "6a62f24ccce0713004889aec1cf226949482d188"; + Configuration conf = getConfiguration(); + conf.set("cassandra.input.cql", "select * from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_TABLE + + " where token(id) > ? and token(id) <= ? and scientist='Faraday1' allow filtering"); + PCollection<KV<Long, String>> cassandraData = + p.apply(HadoopInputFormatIO.<Long, String>read().withConfiguration(conf) + .withValueTranslation(myValueTranslate)); + // Verify the count of data retrieved from Cassandra matches expected count. + PAssert.thatSingleton(cassandraData.apply("Count", Count.<KV<Long, String>>globally())) + .isEqualTo(expectedCount); + PCollection<String> textValues = cassandraData.apply(Values.<String>create()); + // Verify the output values using checksum comparison. + PCollection<String> consolidatedHashcode = + textValues.apply(Combine.globally(new HashingFn()).withoutDefaults()); + PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedChecksum); + p.run().waitUntilFinish(); + } + + /** + * Returns configuration of CqlInutFormat. Mandatory parameters required apart from inputformat + * class name, key class, value class are thrift port, thrift address, partitioner class, keyspace + * and columnfamily name + */ + public Configuration getConfiguration() { + Configuration conf = new Configuration(); + conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, CASSANDRA_PORT); + conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, CASSANDRA_HOST); + conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, CASSANDRA_PARTITIONER_CLASS_VALUE); + conf.set(CASSANDRA_KEYSPACE_PROPERTY, CASSANDRA_KEYSPACE); + conf.set(CASSANDRA_COLUMNFAMILY_PROPERTY, CASSANDRA_TABLE); + conf.setClass("mapreduce.job.inputformat.class", + org.apache.cassandra.hadoop.cql3.CqlInputFormat.class, InputFormat.class); + conf.setClass("key.class", java.lang.Long.class, Object.class); + conf.setClass("value.class", com.datastax.driver.core.Row.class, Object.class); + return conf; + } + + public static void createCassandraData() throws Exception { + session.execute("CREATE KEYSPACE " + CASSANDRA_KEYSPACE + + " WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1};"); + session.execute("USE " + CASSANDRA_KEYSPACE); + session.execute("CREATE TABLE " + CASSANDRA_TABLE + + "(id int, scientist text, PRIMARY KEY(id));"); + for (int i = 0; i < TEST_DATA_ROW_COUNT; i++) { + session.execute("INSERT INTO " + CASSANDRA_TABLE + "(id, scientist) values(" + i + + ", 'Faraday" + i + "');"); + } + } + + @BeforeClass + public static void startCassandra() throws Exception { + //Start the Embedded Cassandra Service + cassandra.start(); + final SocketOptions socketOptions = new SocketOptions(); + // Setting this to 0 disables read timeouts. + socketOptions.setReadTimeoutMillis(0); + // This defaults to 5 s. Increase to a minute. + socketOptions.setConnectTimeoutMillis(60 * 1000); + cluster = + Cluster.builder().addContactPoint(CASSANDRA_HOST).withClusterName("beam") + .withSocketOptions(socketOptions).build(); + session = cluster.connect(); + createCassandraData(); + } + + @AfterClass + public static void stopEmbeddedCassandra() throws Exception { + session.close(); + cluster.close(); + } + + /** + * POJO class for scientist data. + */ + @Table(name = CASSANDRA_TABLE, keyspace = CASSANDRA_KEYSPACE) + public static class Scientist implements Serializable { + private static final long serialVersionUID = 1L; + @Column(name = "scientist") + private String name; + @Column(name = "id") + private int id; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String toString() { + return id + ":" + name; + } + } +}