http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java new file mode 100644 index 0000000..2f2857b --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.hadoop.WritableCoder; +import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.EmployeeRecordReader; +import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsEmployeeInputSplit; +import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource; +import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration; +import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +/** + * Unit tests for {@link HadoopInputFormatIO}. + */ +@RunWith(JUnit4.class) +public class HadoopInputFormatIOTest { + static SerializableConfiguration serConf; + static SimpleFunction<Text, String> myKeyTranslate; + static SimpleFunction<Employee, String> myValueTranslate; + + @Rule public final transient TestPipeline p = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + private PBegin input = PBegin.in(p); + + @BeforeClass + public static void setUp() throws IOException, InterruptedException { + serConf = loadTestConfiguration( + EmployeeInputFormat.class, + Text.class, + Employee.class); + myKeyTranslate = new SimpleFunction<Text, String>() { + @Override + public String apply(Text input) { + return input.toString(); + } + }; + myValueTranslate = new SimpleFunction<Employee, String>() { + @Override + public String apply(Employee input) { + return input.getEmpName() + "_" + input.getEmpAddress(); + } + }; + } + + @Test + public void testReadBuildsCorrectly() { + HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withKeyTranslation(myKeyTranslate) + .withValueTranslation(myValueTranslate); + assertEquals(serConf.getHadoopConfiguration(), + read.getConfiguration().getHadoopConfiguration()); + assertEquals(myKeyTranslate, read.getKeyTranslationFunction()); + assertEquals(myValueTranslate, read.getValueTranslationFunction()); + assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor()); + assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor()); + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} builds correctly in different order + * of with configuration/key translation/value translation. This test also validates output + * PCollection key/value classes are set correctly even if Hadoop configuration is set after + * setting key/value translation. + */ + @Test + public void testReadBuildsCorrectlyInDifferentOrder() { + HadoopInputFormatIO.Read<String, String> read = + HadoopInputFormatIO.<String, String>read() + .withValueTranslation(myValueTranslate) + .withConfiguration(serConf.getHadoopConfiguration()) + .withKeyTranslation(myKeyTranslate); + assertEquals(serConf.getHadoopConfiguration(), + read.getConfiguration().getHadoopConfiguration()); + assertEquals(myKeyTranslate, read.getKeyTranslationFunction()); + assertEquals(myValueTranslate, read.getValueTranslationFunction()); + assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor()); + assertEquals(myValueTranslate.getOutputTypeDescriptor(), read.getValueTypeDescriptor()); + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} object creation if + * {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()} is called more than + * once. + * @throws InterruptedException + * @throws IOException + */ + @Test + public void testReadBuildsCorrectlyIfWithConfigurationIsCalledMoreThanOneTime() + throws IOException, InterruptedException { + SerializableConfiguration diffConf = + loadTestConfiguration( + EmployeeInputFormat.class, + Employee.class, + Text.class); + HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withKeyTranslation(myKeyTranslate) + .withConfiguration(diffConf.getHadoopConfiguration()); + assertEquals(diffConf.getHadoopConfiguration(), + read.getConfiguration().getHadoopConfiguration()); + assertEquals(myKeyTranslate, read.getKeyTranslationFunction()); + assertEquals(null, read.getValueTranslationFunction()); + assertEquals(myKeyTranslate.getOutputTypeDescriptor(), read.getKeyTypeDescriptor()); + assertEquals(diffConf.getHadoopConfiguration().getClass("value.class", Object.class), read + .getValueTypeDescriptor().getRawType()); + } + + /** + * This test validates functionality of {@link HadoopInputFormatIO.Read#populateDisplayData() + * populateDisplayData()}. + */ + @Test + public void testReadDisplayData() { + HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withKeyTranslation(myKeyTranslate) + .withValueTranslation(myValueTranslate); + DisplayData displayData = DisplayData.from(read); + Iterator<Entry<String, String>> propertyElement = serConf.getHadoopConfiguration().iterator(); + while (propertyElement.hasNext()) { + Entry<String, String> element = propertyElement.next(); + assertThat(displayData, hasDisplayItem(element.getKey(), element.getValue())); + } + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with + * null configuration. {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()} + * method checks configuration is null and throws exception if it is null. + */ + @Test + public void testReadObjectCreationFailsIfConfigurationIsNull() { + thrown.expect(NullPointerException.class); + HadoopInputFormatIO.<Text, Employee>read() + .withConfiguration(null); + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with only + * configuration. + */ + @Test + public void testReadObjectCreationWithConfiguration() { + HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read() + .withConfiguration(serConf.getHadoopConfiguration()); + assertEquals(serConf.getHadoopConfiguration(), + read.getConfiguration().getHadoopConfiguration()); + assertEquals(null, read.getKeyTranslationFunction()); + assertEquals(null, read.getValueTranslationFunction()); + assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class), read + .getKeyTypeDescriptor().getRawType()); + assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class), read + .getValueTypeDescriptor().getRawType()); + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with + * configuration and null key translation. {@link HadoopInputFormatIO.Read#withKeyTranslation() + * withKeyTranslation()} checks keyTranslation is null and throws exception if it null value is + * passed. + */ + @Test + public void testReadObjectCreationFailsIfKeyTranslationFunctionIsNull() { + thrown.expect(NullPointerException.class); + HadoopInputFormatIO.<String, Employee>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withKeyTranslation(null); + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with + * configuration and key translation. + */ + @Test + public void testReadObjectCreationWithConfigurationKeyTranslation() { + HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withKeyTranslation(myKeyTranslate); + assertEquals(serConf.getHadoopConfiguration(), + read.getConfiguration().getHadoopConfiguration()); + assertEquals(myKeyTranslate, read.getKeyTranslationFunction()); + assertEquals(null, read.getValueTranslationFunction()); + assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(), + read.getKeyTypeDescriptor().getRawType()); + assertEquals(serConf.getHadoopConfiguration().getClass("value.class", Object.class), + read.getValueTypeDescriptor().getRawType()); + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation fails with + * configuration and null value translation. + * {@link HadoopInputFormatIO.Read#withValueTranslation() withValueTranslation()} checks + * valueTranslation is null and throws exception if null value is passed. + */ + @Test + public void testReadObjectCreationFailsIfValueTranslationFunctionIsNull() { + thrown.expect(NullPointerException.class); + HadoopInputFormatIO.<Text, String>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withValueTranslation(null); + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with + * configuration and value translation. + */ + @Test + public void testReadObjectCreationWithConfigurationValueTranslation() { + HadoopInputFormatIO.Read<Text, String> read = HadoopInputFormatIO.<Text, String>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withValueTranslation(myValueTranslate); + assertEquals(serConf.getHadoopConfiguration(), + read.getConfiguration().getHadoopConfiguration()); + assertEquals(null, read.getKeyTranslationFunction()); + assertEquals(myValueTranslate, read.getValueTranslationFunction()); + assertEquals(serConf.getHadoopConfiguration().getClass("key.class", Object.class), + read.getKeyTypeDescriptor().getRawType()); + assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(), + read.getValueTypeDescriptor().getRawType()); + } + + /** + * This test validates {@link HadoopInputFormatIO.Read Read} transform object creation with + * configuration, key translation and value translation. + */ + @Test + public void testReadObjectCreationWithConfigurationKeyTranslationValueTranslation() { + HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withKeyTranslation(myKeyTranslate) + .withValueTranslation(myValueTranslate); + assertEquals(serConf.getHadoopConfiguration(), + read.getConfiguration().getHadoopConfiguration()); + assertEquals(myKeyTranslate, read.getKeyTranslationFunction()); + assertEquals(myValueTranslate, read.getValueTranslationFunction()); + assertEquals(myKeyTranslate.getOutputTypeDescriptor().getRawType(), + read.getKeyTypeDescriptor().getRawType()); + assertEquals(myValueTranslate.getOutputTypeDescriptor().getRawType(), + read.getValueTypeDescriptor().getRawType()); + } + + /** + * This test validates functionality of {@link HadoopInputFormatIO.Read#validate() + * Read.validate()} function when Read transform is created without calling + * {@link HadoopInputFormatIO.Read#withConfiguration() withConfiguration()}. + */ + @Test + public void testReadValidationFailsMissingConfiguration() { + HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read(); + thrown.expect(NullPointerException.class); + read.validate(input); + } + + /** + * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration() + * withConfiguration()} function when Hadoop InputFormat class is not provided by the user in + * configuration. + */ + @Test + public void testReadValidationFailsMissingInputFormatInConf() { + Configuration configuration = new Configuration(); + configuration.setClass("key.class", Text.class, Object.class); + configuration.setClass("value.class", Employee.class, Object.class); + thrown.expect(NullPointerException.class); + HadoopInputFormatIO.<Text, Employee>read() + .withConfiguration(configuration); + } + + /** + * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration() + * withConfiguration()} function when key class is not provided by the user in configuration. + */ + @Test + public void testReadValidationFailsMissingKeyClassInConf() { + Configuration configuration = new Configuration(); + configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class, + InputFormat.class); + configuration.setClass("value.class", Employee.class, Object.class); + thrown.expect(NullPointerException.class); + HadoopInputFormatIO.<Text, Employee>read() + .withConfiguration(configuration); + } + + /** + * This test validates functionality of {@link HadoopInputFormatIO.Read#withConfiguration() + * withConfiguration()} function when value class is not provided by the user in configuration. + */ + @Test + public void testReadValidationFailsMissingValueClassInConf() { + Configuration configuration = new Configuration(); + configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class, + InputFormat.class); + configuration.setClass("key.class", Text.class, Object.class); + thrown.expect(NullPointerException.class); + HadoopInputFormatIO.<Text, Employee>read().withConfiguration(configuration); + } + + /** + * This test validates functionality of {@link HadoopInputFormatIO.Read#validate() + * Read.validate()} function when myKeyTranslate's (simple function provided by user for key + * translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set in + * configuration as "key.class"). + */ + @Test + public void testReadValidationFailsWithWrongInputTypeKeyTranslationFunction() { + SimpleFunction<LongWritable, String> myKeyTranslateWithWrongInputType = + new SimpleFunction<LongWritable, String>() { + @Override + public String apply(LongWritable input) { + return input.toString(); + } + }; + HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withKeyTranslation(myKeyTranslateWithWrongInputType); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(String.format( + "Key translation's input type is not same as hadoop InputFormat : %s key " + "class : %s", + serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class", + InputFormat.class), serConf.getHadoopConfiguration() + .getClass("key.class", Object.class))); + read.validate(input); + } + + /** + * This test validates functionality of {@link HadoopInputFormatIO.Read#validate() + * Read.validate()} function when myValueTranslate's (simple function provided by user for value + * translation) input type is not same as Hadoop InputFormat's valueClass(Which is property set in + * configuration as "value.class"). + */ + @Test + public void testReadValidationFailsWithWrongInputTypeValueTranslationFunction() { + SimpleFunction<LongWritable, String> myValueTranslateWithWrongInputType = + new SimpleFunction<LongWritable, String>() { + @Override + public String apply(LongWritable input) { + return input.toString(); + } + }; + HadoopInputFormatIO.Read<Text, String> read = + HadoopInputFormatIO.<Text, String>read() + .withConfiguration(serConf.getHadoopConfiguration()) + .withValueTranslation(myValueTranslateWithWrongInputType); + String expectedMessage = + String.format( + "Value translation's input type is not same as hadoop InputFormat : " + + "%s value class : %s", + serConf.getHadoopConfiguration().getClass("mapreduce.job.inputformat.class", + InputFormat.class), + serConf.getHadoopConfiguration().getClass("value.class", Object.class)); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(expectedMessage); + read.validate(input); + } + + @Test + public void testReadingData() throws Exception { + HadoopInputFormatIO.Read<Text, Employee> read = HadoopInputFormatIO.<Text, Employee>read() + .withConfiguration(serConf.getHadoopConfiguration()); + List<KV<Text, Employee>> expected = TestEmployeeDataSet.getEmployeeData(); + PCollection<KV<Text, Employee>> actual = p.apply("ReadTest", read); + PAssert.that(actual).containsInAnyOrder(expected); + p.run(); + } + + /** + * This test validates behavior of {@link HadoopInputFormatBoundedSource} if RecordReader object + * creation fails. + */ + @Test + public void testReadIfCreateRecordReaderFails() throws Exception { + thrown.expect(Exception.class); + thrown.expectMessage("Exception in creating RecordReader"); + InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); + Mockito.when( + mockInputFormat.createRecordReader(Mockito.any(InputSplit.class), + Mockito.any(TaskAttemptContext.class))).thenThrow( + new IOException("Exception in creating RecordReader")); + HadoopInputFormatBoundedSource<Text, Employee> boundedSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + new SerializableSplit()); + boundedSource.setInputFormatObj(mockInputFormat); + SourceTestUtils.readFromSource(boundedSource, p.getOptions()); + } + + /** + * This test validates behavior of HadoopInputFormatSource if + * {@link InputFormat#createRecordReader() createRecordReader()} of InputFormat returns null. + */ + @Test + public void testReadWithNullCreateRecordReader() throws Exception { + InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); + thrown.expect(IOException.class); + thrown.expectMessage(String.format("Null RecordReader object returned by %s", + mockInputFormat.getClass())); + Mockito.when( + mockInputFormat.createRecordReader(Mockito.any(InputSplit.class), + Mockito.any(TaskAttemptContext.class))).thenReturn(null); + HadoopInputFormatBoundedSource<Text, Employee> boundedSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + new SerializableSplit()); + boundedSource.setInputFormatObj(mockInputFormat); + SourceTestUtils.readFromSource(boundedSource, p.getOptions()); + } + + /** + * This test validates behavior of + * {@link HadoopInputFormatBoundedSource.HadoopInputFormatReader#start() start()} method if + * InputFormat's {@link InputFormat#getSplits() getSplits()} returns InputSplitList having zero + * records. + */ + @Test + public void testReadersStartWhenZeroRecords() throws Exception { + InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); + EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class); + Mockito.when( + mockInputFormat.createRecordReader(Mockito.any(InputSplit.class), + Mockito.any(TaskAttemptContext.class))).thenReturn(mockReader); + Mockito.when(mockReader.nextKeyValue()).thenReturn(false); + InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class); + HadoopInputFormatBoundedSource<Text, Employee> boundedSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + new SerializableSplit(mockInputSplit)); + BoundedReader<KV<Text, Employee>> boundedReader = boundedSource.createReader(p.getOptions()); + assertEquals(false, boundedReader.start()); + assertEquals(Double.valueOf(1), boundedReader.getFractionConsumed()); + } + + /** + * This test validates the method getFractionConsumed()- which indicates the progress of the read + * in range of 0 to 1. + */ + @Test + public void testReadersGetFractionConsumed() throws Exception { + List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData(); + HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource( + EmployeeInputFormat.class, + Text.class, + Employee.class, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class)); + long estimatedSize = hifSource.getEstimatedSizeBytes(p.getOptions()); + // Validate if estimated size is equal to the size of records. + assertEquals(referenceRecords.size(), estimatedSize); + List<BoundedSource<KV<Text, Employee>>> boundedSourceList = + hifSource.splitIntoBundles(0, p.getOptions()); + // Validate if splitIntoBundles() has split correctly. + assertEquals(TestEmployeeDataSet.NUMBER_OF_SPLITS, boundedSourceList.size()); + List<KV<Text, Employee>> bundleRecords = new ArrayList<>(); + for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) { + List<KV<Text, Employee>> elements = new ArrayList<KV<Text, Employee>>(); + BoundedReader<KV<Text, Employee>> reader = source.createReader(p.getOptions()); + float recordsRead = 0; + // When start is not called, getFractionConsumed() should return 0. + assertEquals(Double.valueOf(0), reader.getFractionConsumed()); + boolean start = reader.start(); + assertEquals(true, start); + if (start) { + elements.add(reader.getCurrent()); + boolean advance = reader.advance(); + // Validate if getFractionConsumed() returns the correct fraction based on + // the number of records read in the split. + assertEquals( + Double.valueOf(++recordsRead / TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT), + reader.getFractionConsumed()); + assertEquals(true, advance); + while (advance) { + elements.add(reader.getCurrent()); + advance = reader.advance(); + assertEquals( + Double.valueOf(++recordsRead / TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT), + reader.getFractionConsumed()); + } + bundleRecords.addAll(elements); + } + // Validate if getFractionConsumed() returns 1 after reading is complete. + assertEquals(Double.valueOf(1), reader.getFractionConsumed()); + reader.close(); + } + assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray())); + } + + /** + * This test validates that reader and its parent source reads the same records. + */ + @Test + public void testReaderAndParentSourceReadsSameData() throws Exception { + InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class); + HadoopInputFormatBoundedSource<Text, Employee> boundedSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + new SerializableSplit(mockInputSplit)); + BoundedReader<KV<Text, Employee>> reader = boundedSource + .createReader(p.getOptions()); + SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(reader, p.getOptions()); + } + + /** + * This test verifies that the method + * {@link HadoopInputFormatBoundedSource.HadoopInputFormatReader#getCurrentSource() + * getCurrentSource()} returns correct source object. + */ + @Test + public void testGetCurrentSourceFunction() throws Exception { + SerializableSplit split = new SerializableSplit(); + BoundedSource<KV<Text, Employee>> source = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + split); + BoundedReader<KV<Text, Employee>> hifReader = source.createReader(p.getOptions()); + BoundedSource<KV<Text, Employee>> hifSource = hifReader.getCurrentSource(); + assertEquals(hifSource, source); + } + + /** + * This test validates behavior of {@link HadoopInputFormatBoundedSource#createReader() + * createReader()} method when {@link HadoopInputFormatBoundedSource#splitIntoBundles() + * splitIntoBundles()} is not called. + */ + @Test + public void testCreateReaderIfSplitIntoBundlesNotCalled() throws Exception { + HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource( + EmployeeInputFormat.class, + Text.class, + Employee.class, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class)); + thrown.expect(IOException.class); + thrown.expectMessage("Cannot create reader as source is not split yet."); + hifSource.createReader(p.getOptions()); + } + + /** + * This test validates behavior of + * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop + * InputFormat's {@link InputFormat#getSplits() getSplits()} returns empty list. + */ + @Test + public void testComputeSplitsIfGetSplitsReturnsEmptyList() throws Exception { + InputFormat<?, ?> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); + SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class); + Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn( + new ArrayList<InputSplit>()); + HadoopInputFormatBoundedSource<Text, Employee> hifSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + mockInputSplit); + thrown.expect(IOException.class); + thrown.expectMessage("Error in computing splits, getSplits() returns a empty list"); + hifSource.setInputFormatObj(mockInputFormat); + hifSource.computeSplitsIfNecessary(); + } + + /** + * This test validates behavior of + * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop + * InputFormat's {@link InputFormat#getSplits() getSplits()} returns NULL value. + */ + @Test + public void testComputeSplitsIfGetSplitsReturnsNullValue() throws Exception { + InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); + SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class); + Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(null); + HadoopInputFormatBoundedSource<Text, Employee> hifSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + mockInputSplit); + thrown.expect(IOException.class); + thrown.expectMessage("Error in computing splits, getSplits() returns null."); + hifSource.setInputFormatObj(mockInputFormat); + hifSource.computeSplitsIfNecessary(); + } + + /** + * This test validates behavior of + * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} if Hadoop + * InputFormat's {@link InputFormat#getSplits() getSplits()} returns InputSplit list having some + * null values. + */ + @Test + public void testComputeSplitsIfGetSplitsReturnsListHavingNullValues() throws Exception { + // InputSplit list having null value. + InputSplit mockInputSplit = + Mockito.mock(InputSplit.class, Mockito.withSettings().extraInterfaces(Writable.class)); + List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); + inputSplitList.add(mockInputSplit); + inputSplitList.add(null); + InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); + Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn( + inputSplitList); + HadoopInputFormatBoundedSource<Text, Employee> hifSource = + new HadoopInputFormatBoundedSource<Text, Employee>( + serConf, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class), + null, // No key translation required. + null, // No value translation required. + new SerializableSplit()); + thrown.expect(IOException.class); + thrown.expectMessage("Error in computing splits, split is null in InputSplits list populated " + + "by getSplits() : "); + hifSource.setInputFormatObj(mockInputFormat); + hifSource.computeSplitsIfNecessary(); + } + + /** + * This test validates records emitted in PCollection are immutable if InputFormat's recordReader + * returns same objects(i.e. same locations in memory) but with updated values for each record. + */ + @Test + public void testImmutablityOfOutputOfReadIfRecordReaderObjectsAreMutable() throws Exception { + List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList( + ReuseObjectsEmployeeInputFormat.class, + Text.class, + Employee.class, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class)); + List<KV<Text, Employee>> bundleRecords = new ArrayList<>(); + for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) { + List<KV<Text, Employee>> elems = SourceTestUtils.readFromSource(source, p.getOptions()); + bundleRecords.addAll(elems); + } + List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData(); + assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray())); + } + + /** + * Test reading if InputFormat implements {@link org.apache.hadoop.conf.Configurable + * Configurable}. + */ + @Test + public void testReadingWithConfigurableInputFormat() throws Exception { + List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList( + ConfigurableEmployeeInputFormat.class, + Text.class, + Employee.class, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class)); + for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) { + // Cast to HadoopInputFormatBoundedSource to access getInputFormat(). + @SuppressWarnings("unchecked") + HadoopInputFormatBoundedSource<Text, Employee> hifSource = + (HadoopInputFormatBoundedSource<Text, Employee>) source; + hifSource.createInputFormatInstance(); + ConfigurableEmployeeInputFormat inputFormatObj = + (ConfigurableEmployeeInputFormat) hifSource.getInputFormat(); + assertEquals(true, inputFormatObj.isConfSet); + } + } + + /** + * This test validates records emitted in PCollection are immutable if InputFormat's + * {@link org.apache.hadoop.mapreduce.RecordReader RecordReader} returns different objects (i.e. + * different locations in memory). + */ + @Test + public void testImmutablityOfOutputOfReadIfRecordReaderObjectsAreImmutable() throws Exception { + List<BoundedSource<KV<Text, Employee>>> boundedSourceList = getBoundedSourceList( + EmployeeInputFormat.class, + Text.class, + Employee.class, + WritableCoder.of(Text.class), + AvroCoder.of(Employee.class)); + List<KV<Text, Employee>> bundleRecords = new ArrayList<>(); + for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) { + List<KV<Text, Employee>> elems = SourceTestUtils.readFromSource(source, p.getOptions()); + bundleRecords.addAll(elems); + } + List<KV<Text, Employee>> referenceRecords = TestEmployeeDataSet.getEmployeeData(); + assertThat(bundleRecords, containsInAnyOrder(referenceRecords.toArray())); + } + + private static SerializableConfiguration loadTestConfiguration(Class<?> inputFormatClassName, + Class<?> keyClass, Class<?> valueClass) { + Configuration conf = new Configuration(); + conf.setClass("mapreduce.job.inputformat.class", inputFormatClassName, InputFormat.class); + conf.setClass("key.class", keyClass, Object.class); + conf.setClass("value.class", valueClass, Object.class); + return new SerializableConfiguration(conf); + } + + private <K, V> HadoopInputFormatBoundedSource<K, V> getTestHIFSource( + Class<?> inputFormatClass, + Class<K> inputFormatKeyClass, + Class<V> inputFormatValueClass, + Coder<K> keyCoder, + Coder<V> valueCoder){ + SerializableConfiguration serConf = + loadTestConfiguration( + inputFormatClass, + inputFormatKeyClass, + inputFormatValueClass); + return new HadoopInputFormatBoundedSource<K, V>( + serConf, + keyCoder, + valueCoder, + null, // No key translation required. + null); // No value translation required. + } + + private <K, V> List<BoundedSource<KV<K, V>>> getBoundedSourceList( + Class<?> inputFormatClass, + Class<K> inputFormatKeyClass, + Class<V> inputFormatValueClass, + Coder<K> keyCoder, + Coder<V> valueCoder) throws Exception{ + HadoopInputFormatBoundedSource<K, V> boundedSource = getTestHIFSource( + inputFormatClass, + inputFormatKeyClass, + inputFormatValueClass, + keyCoder, + valueCoder); + return boundedSource.splitIntoBundles(0, p.getOptions()); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java new file mode 100644 index 0000000..fbe74ec --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/ReuseObjectsEmployeeInputFormat.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * This is a valid InputFormat for reading employee data which is available in the form of + * {@code List<KV>} as {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList + * employeeDataList}. {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList + * employeeDataList} is populated using {@linkplain TestEmployeeDataSet#populateEmployeeDataNew()}. + * + * <p>{@linkplain ReuseObjectsEmployeeInputFormat} splits data into + * {@value TestEmployeeDataSet#NUMBER_OF_SPLITS} splits, each split having + * {@value TestEmployeeDataSet#NUMBER_OF_RECORDS_IN_EACH_SPLIT} records each. + * {@linkplain ReuseObjectsEmployeeInputFormat} reads data from + * {@linkplain ReuseObjectsEmployeeRecordReader#employeeDataList employeeDataList} and produces a + * key (employee id) of type Text and value of type {@linkplain Employee Employee}. + * + * <p>{@linkplain ReuseObjectsEmployeeInputFormat} is also input to test whether + * {@linkplain HadoopInputFormatIO } source returns immutable records for a scenario when + * RecordReader returns the same key and value objects with updating values every time it reads + * data. + */ +public class ReuseObjectsEmployeeInputFormat extends InputFormat<Text, Employee> { + + public ReuseObjectsEmployeeInputFormat() {} + + @Override + public RecordReader<Text, Employee> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new ReuseObjectsEmployeeRecordReader(); + } + + @Override + public List<InputSplit> getSplits(JobContext arg0) throws IOException, InterruptedException { + List<InputSplit> inputSplitList = new ArrayList<InputSplit>(); + for (int i = 1; i <= TestEmployeeDataSet.NUMBER_OF_SPLITS; i++) { + InputSplit inputSplitObj = new ReuseEmployeeInputSplit( + ((i - 1) * TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT), + (i * TestEmployeeDataSet.NUMBER_OF_RECORDS_IN_EACH_SPLIT - 1)); + inputSplitList.add(inputSplitObj); + } + return inputSplitList; + } + + /** + * InputSplit implementation for ReuseObjectsEmployeeInputFormat. + */ + public class ReuseEmployeeInputSplit extends InputSplit implements Writable { + // Start and end map index of each split of employeeData. + private long startIndex; + private long endIndex; + + public ReuseEmployeeInputSplit() {} + + public ReuseEmployeeInputSplit(long startIndex, long endIndex) { + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + /** Returns number of records in each split. */ + @Override + public long getLength() throws IOException, InterruptedException { + return this.endIndex - this.startIndex + 1; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return null; + } + + + public long getStartIndex() { + return startIndex; + } + + public long getEndIndex() { + return endIndex; + } + + @Override + public void readFields(DataInput dataIn) throws IOException { + startIndex = dataIn.readLong(); + endIndex = dataIn.readLong(); + } + + @Override + public void write(DataOutput dataOut) throws IOException { + dataOut.writeLong(startIndex); + dataOut.writeLong(endIndex); + } + } + + /** + * RecordReader for ReuseObjectsEmployeeInputFormat. + */ + public class ReuseObjectsEmployeeRecordReader extends RecordReader<Text, Employee> { + + private ReuseEmployeeInputSplit split; + private Text currentKey = new Text(); + private Employee currentValue = new Employee(); + private long employeeListIndex = 0L; + private long recordsRead = 0L; + private List<KV<String, String>> employeeDataList; + + public ReuseObjectsEmployeeRecordReader() {} + + @Override + public void close() throws IOException {} + + @Override + public Text getCurrentKey() throws IOException, InterruptedException { + return currentKey; + } + + @Override + public Employee getCurrentValue() throws IOException, InterruptedException { + return currentValue; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return (float) recordsRead / split.getLength(); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext arg1) + throws IOException, InterruptedException { + this.split = (ReuseEmployeeInputSplit) split; + employeeListIndex = this.split.getStartIndex() - 1; + recordsRead = 0; + employeeDataList = TestEmployeeDataSet.populateEmployeeData(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if ((recordsRead++) >= split.getLength()) { + return false; + } + employeeListIndex++; + KV<String, String> employeeDetails = employeeDataList.get((int) employeeListIndex); + String empData[] = employeeDetails.getValue().split("_"); + // Updating the same key and value objects with new employee data. + currentKey.set(employeeDetails.getKey()); + currentValue.setEmpName(empData[0]); + currentValue.setEmpAddress(empData[1]); + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java new file mode 100644 index 0000000..4a8fe95 --- /dev/null +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestEmployeeDataSet.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.io.Text; +/** + * Test Utils used in {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat} for + * computing splits. + */ +public class TestEmployeeDataSet { + public static final long NUMBER_OF_RECORDS_IN_EACH_SPLIT = 5L; + public static final long NUMBER_OF_SPLITS = 3L; + private static final List<KV<String, String>> data = new ArrayList<KV<String, String>>(); + + /** + * Returns List of employee details. Employee details are available in the form of {@link KV} in + * which, key indicates employee id and value indicates employee details such as name and address + * separated by '_'. This is data input to {@link EmployeeInputFormat} and + * {@link ReuseObjectsEmployeeInputFormat}. + */ + public static List<KV<String, String>> populateEmployeeData() { + if (!data.isEmpty()) { + return data; + } + data.add(KV.of("0", "Alex_US")); + data.add(KV.of("1", "John_UK")); + data.add(KV.of("2", "Tom_UK")); + data.add(KV.of("3", "Nick_UAE")); + data.add(KV.of("4", "Smith_IND")); + data.add(KV.of("5", "Taylor_US")); + data.add(KV.of("6", "Gray_UK")); + data.add(KV.of("7", "James_UAE")); + data.add(KV.of("8", "Jordan_IND")); + data.add(KV.of("9", "Leena_UK")); + data.add(KV.of("10", "Zara_UAE")); + data.add(KV.of("11", "Talia_IND")); + data.add(KV.of("12", "Rose_UK")); + data.add(KV.of("13", "Kelvin_UAE")); + data.add(KV.of("14", "Goerge_IND")); + return data; + } + + /** + * This is a helper function used in unit tests for validating data against data read using + * {@link EmployeeInputFormat} and {@link ReuseObjectsEmployeeInputFormat}. + */ + public static List<KV<Text, Employee>> getEmployeeData() { + return Lists.transform((data.isEmpty() ? populateEmployeeData() : data), + new Function<KV<String, String>, KV<Text, Employee>>() { + @Override + public KV<Text, Employee> apply(KV<String, String> input) { + String[] empData = input.getValue().split("_"); + return KV.of(new Text(input.getKey()), new Employee(empData[0], empData[1])); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml new file mode 100644 index 0000000..4c510ae --- /dev/null +++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml @@ -0,0 +1,278 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- The HifIO tests for Cassandra and Elasticsearch both work only with + jdk1.8, but Beam's enforcer rules require jdk1.7 and jdk1.8 support. This + child module contains only those tests and overrides the enforcer rules to + allow 1.8 only behavior without making all of HifIO work only with jdk1.8. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>beam-sdks-java-io-hadoop-jdk1.8-tests</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop :: jdk1.8-tests</name> + <description>Integration tests and junits which need JDK1.8.</description> + + <build> + <plugins> + <plugin> + <!-- Guava shading is required as Cassandra tests require version + 19 of Guava, by default project wide Guava shading may not suffice as it + loads a different version of guava which will not work for Cassandra tests --> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes> + <include>com.google.guava:guava:19.0</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.common</shadedPattern> + </relocation> + <relocation> + <pattern>com.google.thirdparty</pattern> + <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.thirdparty</shadedPattern> + </relocation> + </relocations> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <!-- Overridden enforcer plugin for JDK1.8 for running tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.4.1</version> + <executions> + <execution> + <id>enforce</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <enforceBytecodeVersion> + <maxJdkVersion>1.8</maxJdkVersion> + <excludes> + <!-- Supplied by the user JDK and compiled with matching + version. Is not shaded, so safe to ignore. --> + <exclude>jdk.tools:jdk.tools</exclude> + </excludes> + </enforceBytecodeVersion> + <requireJavaVersion> + <version>[1.8,)</version> + </requireJavaVersion> + </rules> + </configuration> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>org.codehaus.mojo</groupId> + <artifactId>extra-enforcer-rules</artifactId> + <version>1.0-beta-6</version> + </dependency> + </dependencies> + </plugin> + </plugins> + </build> + <!--The dataflow-runner and spark-runner profiles support using those runners + during an integration test. These are not the long-term way we want to support + using runners in ITs (e.g. it is annoying to add to all IO modules.) We cannot + create a dependency IO -> Runners since the runners depend on IO (e.g. kafka + depends on spark.) --> + + <profiles> + <!-- Include the Apache Spark runner -P spark-runner --> + <profile> + <id>spark-runner</id> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-spark</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> + <scope>runtime</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + + <!-- Include the Google Cloud Dataflow runner -P dataflow-runner --> + <profile> + <id>dataflow-runner</id> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + </profile> + </profiles> + + <properties> + <log4j.core.version>2.6.2</log4j.core.version> + <hadoop.common.version>2.7.0</hadoop.common.version> + <guava.version>19.0</guava.version> + <transport.netty4.client.version>5.0.0</transport.netty4.client.version> + <netty.transport.native.epoll.version>4.1.0.CR3</netty.transport.native.epoll.version> + <elasticsearch.version>5.0.0</elasticsearch.version> + <cassandra.driver.mapping.version>3.1.1</cassandra.driver.mapping.version> + <cassandra.all.verison>3.9</cassandra.all.verison> + <cassandra.driver.core.version>3.1.1</cassandra.driver.core.version> + <commons.io.version>2.4</commons.io.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <!-- compile dependencies --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.common.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.common.version}</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.6.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch.plugin</groupId> + <artifactId>transport-netty4-client</artifactId> + <version>${transport.netty4.client.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>transport</artifactId> + <version>${elasticsearch.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + <version>${netty.transport.native.epoll.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>${elasticsearch.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch-hadoop</artifactId> + <version>${elasticsearch.version}</version> + </dependency> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-mapping</artifactId> + <version>${cassandra.driver.mapping.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>cassandra-all</artifactId> + <version>${cassandra.all.verison}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.datastax.cassandra</groupId> + <artifactId>cassandra-driver-core</artifactId> + <version>${cassandra.driver.core.version}</version> + <scope>test</scope> + </dependency> + + <!-- runtime dependencies --> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>${commons.io.version}</version> + <scope>runtime</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java new file mode 100644 index 0000000..599a4a1 --- /dev/null +++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.hadoop.cfg.ConfigurationOptions; +import org.elasticsearch.hadoop.mr.EsInputFormat; +import org.elasticsearch.hadoop.mr.LinkedMapWritable; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty4Plugin; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests to validate HadoopInputFormatIO for embedded Elasticsearch instance. + * + * {@link EsInputFormat} can be used to read data from Elasticsearch. EsInputFormat by default + * returns key class as Text and value class as LinkedMapWritable. You can also set MapWritable as + * value class, provided that you set the property "mapred.mapoutput.value.class" with + * MapWritable.class. If this property is not set then, using MapWritable as value class may give + * org.apache.beam.sdk.coders.CoderException due to unexpected extra bytes after decoding. + */ + +@RunWith(JUnit4.class) +public class HIFIOWithElasticTest implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LoggerFactory.getLogger(HIFIOWithElasticTest.class); + private static final String ELASTIC_IN_MEM_HOSTNAME = "127.0.0.1"; + private static final String ELASTIC_IN_MEM_PORT = "9200"; + private static final String ELASTIC_INTERNAL_VERSION = "5.x"; + private static final String TRUE = "true"; + private static final String ELASTIC_INDEX_NAME = "beamdb"; + private static final String ELASTIC_TYPE_NAME = "scientists"; + private static final String ELASTIC_RESOURCE = "/" + ELASTIC_INDEX_NAME + "/" + ELASTIC_TYPE_NAME; + private static final int TEST_DATA_ROW_COUNT = 10; + private static final String ELASTIC_TYPE_ID_PREFIX = "s"; + + @ClassRule + public static TemporaryFolder elasticTempFolder = new TemporaryFolder(); + + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void startServer() + throws NodeValidationException, InterruptedException, IOException { + ElasticEmbeddedServer.startElasticEmbeddedServer(); + } + + /** + * Test to read data from embedded Elasticsearch instance and verify whether data is read + * successfully. + */ + @Test + public void testHifIOWithElastic() { + // Expected hashcode is evaluated during insertion time one time and hardcoded here. + String expectedHashCode = "e2098f431f90193aa4545e033e6fd2217aafe7b6"; + Configuration conf = getConfiguration(); + PCollection<KV<Text, LinkedMapWritable>> esData = + pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf)); + PCollection<Long> count = esData.apply(Count.<KV<Text, LinkedMapWritable>>globally()); + // Verify that the count of objects fetched using HIFInputFormat IO is correct. + PAssert.thatSingleton(count).isEqualTo((long) TEST_DATA_ROW_COUNT); + PCollection<LinkedMapWritable> values = esData.apply(Values.<LinkedMapWritable>create()); + PCollection<String> textValues = values.apply(transformFunc); + // Verify the output values using checksum comparison. + PCollection<String> consolidatedHashcode = + textValues.apply(Combine.globally(new HashingFn()).withoutDefaults()); + PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode); + pipeline.run().waitUntilFinish(); + } + + MapElements<LinkedMapWritable, String> transformFunc = + MapElements.<LinkedMapWritable, String>via(new SimpleFunction<LinkedMapWritable, String>() { + @Override + public String apply(LinkedMapWritable mapw) { + return mapw.get(new Text("id")) + "|" + mapw.get(new Text("scientist")); + } + }); + /** + * Test to read data from embedded Elasticsearch instance based on query and verify whether data + * is read successfully. + */ + @Test + public void testHifIOWithElasticQuery() { + long expectedRowCount = 1L; + String expectedHashCode = "caa37dbd8258e3a7f98932958c819a57aab044ec"; + Configuration conf = getConfiguration(); + String fieldValue = ELASTIC_TYPE_ID_PREFIX + "2"; + String query = "{" + + " \"query\": {" + + " \"match\" : {" + + " \"id\" : {" + + " \"query\" : \"" + fieldValue + "\"," + + " \"type\" : \"boolean\"" + + " }" + + " }" + + " }" + + "}"; + conf.set(ConfigurationOptions.ES_QUERY, query); + PCollection<KV<Text, LinkedMapWritable>> esData = + pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf)); + PCollection<Long> count = esData.apply(Count.<KV<Text, LinkedMapWritable>>globally()); + // Verify that the count of objects fetched using HIFInputFormat IO is correct. + PAssert.thatSingleton(count).isEqualTo(expectedRowCount); + PCollection<LinkedMapWritable> values = esData.apply(Values.<LinkedMapWritable>create()); + PCollection<String> textValues = values.apply(transformFunc); + // Verify the output values using checksum comparison. + PCollection<String> consolidatedHashcode = + textValues.apply(Combine.globally(new HashingFn()).withoutDefaults()); + PAssert.that(consolidatedHashcode).containsInAnyOrder(expectedHashCode); + pipeline.run().waitUntilFinish(); + } + + /** + * Set the Elasticsearch configuration parameters in the Hadoop configuration object. + * Configuration object should have InputFormat class, key class and value class set. Mandatory + * fields for ESInputFormat to be set are es.resource, es.nodes, es.port, es.internal.es.version. + * Please refer to + * <a href="https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html" + * >Elasticsearch Configuration</a> for more details. + */ + public Configuration getConfiguration() { + Configuration conf = new Configuration(); + conf.set(ConfigurationOptions.ES_NODES, ELASTIC_IN_MEM_HOSTNAME); + conf.set(ConfigurationOptions.ES_PORT, String.format("%s", ELASTIC_IN_MEM_PORT)); + conf.set(ConfigurationOptions.ES_RESOURCE, ELASTIC_RESOURCE); + conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION); + conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, TRUE); + conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, TRUE); + conf.setClass("mapreduce.job.inputformat.class", + org.elasticsearch.hadoop.mr.EsInputFormat.class, InputFormat.class); + conf.setClass("key.class", Text.class, Object.class); + conf.setClass("value.class", LinkedMapWritable.class, Object.class); + return conf; + } + + private static Map<String, String> createElasticRow(String id, String name) { + Map<String, String> data = new HashMap<String, String>(); + data.put("id", id); + data.put("scientist", name); + return data; + } + + @AfterClass + public static void shutdownServer() throws IOException { + ElasticEmbeddedServer.shutdown(); + } + + /** + * Class for in memory Elasticsearch server. + */ + static class ElasticEmbeddedServer implements Serializable { + private static final long serialVersionUID = 1L; + private static Node node; + + public static void startElasticEmbeddedServer() + throws NodeValidationException, InterruptedException { + Settings settings = Settings.builder() + .put("node.data", TRUE) + .put("network.host", ELASTIC_IN_MEM_HOSTNAME) + .put("http.port", ELASTIC_IN_MEM_PORT) + .put("path.data", elasticTempFolder.getRoot().getPath()) + .put("path.home", elasticTempFolder.getRoot().getPath()) + .put("transport.type", "local") + .put("http.enabled", TRUE) + .put("node.ingest", TRUE).build(); + node = new PluginNode(settings); + node.start(); + LOGGER.info("Elastic in memory server started."); + prepareElasticIndex(); + LOGGER.info("Prepared index " + ELASTIC_INDEX_NAME + + "and populated data on elastic in memory server."); + } + + /** + * Prepares Elastic index, by adding rows. + */ + private static void prepareElasticIndex() throws InterruptedException { + CreateIndexRequest indexRequest = new CreateIndexRequest(ELASTIC_INDEX_NAME); + node.client().admin().indices().create(indexRequest).actionGet(); + for (int i = 0; i < TEST_DATA_ROW_COUNT; i++) { + node.client().prepareIndex(ELASTIC_INDEX_NAME, ELASTIC_TYPE_NAME, String.valueOf(i)) + .setSource(createElasticRow(ELASTIC_TYPE_ID_PREFIX + i, "Faraday" + i)).execute() + .actionGet(); + } + node.client().admin().indices().prepareRefresh(ELASTIC_INDEX_NAME).get(); + } + /** + * Shutdown the embedded instance. + * @throws IOException + */ + public static void shutdown() throws IOException { + DeleteIndexRequest indexRequest = new DeleteIndexRequest(ELASTIC_INDEX_NAME); + node.client().admin().indices().delete(indexRequest).actionGet(); + LOGGER.info("Deleted index " + ELASTIC_INDEX_NAME + " from elastic in memory server"); + node.close(); + LOGGER.info("Closed elastic in memory server node."); + deleteElasticDataDirectory(); + } + + private static void deleteElasticDataDirectory() { + try { + FileUtils.deleteDirectory(new File(elasticTempFolder.getRoot().getPath())); + } catch (IOException e) { + throw new RuntimeException("Could not delete elastic data directory: " + e.getMessage(), e); + } + } + } + + /** + * Class created for handling "http.enabled" property as "true" for Elasticsearch node. + */ + static class PluginNode extends Node implements Serializable { + + private static final long serialVersionUID = 1L; + static Collection<Class<? extends Plugin>> list = new ArrayList<Class<? extends Plugin>>(); + static { + list.add(Netty4Plugin.class); + } + + public PluginNode(final Settings settings) { + super(InternalSettingsPreparer.prepareEnvironment(settings, null), list); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/custom/options/HIFTestOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/custom/options/HIFTestOptions.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/custom/options/HIFTestOptions.java new file mode 100644 index 0000000..2e89ed1 --- /dev/null +++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/custom/options/HIFTestOptions.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat.custom.options; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.testing.TestPipelineOptions; + +/** + * Properties needed when using HadoopInputFormatIO with the Beam SDK. + */ +public interface HIFTestOptions extends TestPipelineOptions { + + //Cassandra test options + @Description("Cassandra Server IP") + @Default.String("cassandraServerIp") + String getCassandraServerIp(); + void setCassandraServerIp(String cassandraServerIp); + @Description("Cassandra Server port") + @Default.Integer(0) + Integer getCassandraServerPort(); + void setCassandraServerPort(Integer cassandraServerPort); + @Description("Cassandra User name") + @Default.String("cassandraUserName") + String getCassandraUserName(); + void setCassandraUserName(String cassandraUserName); + @Description("Cassandra Password") + @Default.String("cassandraPassword") + String getCassandraPassword(); + void setCassandraPassword(String cassandraPassword); + + //Elasticsearch test options + @Description("Elasticsearch Server IP") + @Default.String("elasticServerIp") + String getElasticServerIp(); + void setElasticServerIp(String elasticServerIp); + @Description("Elasticsearch Server port") + @Default.Integer(0) + Integer getElasticServerPort(); + void setElasticServerPort(Integer elasticServerPort); + @Description("Elasticsearch User name") + @Default.String("elasticUserName") + String getElasticUserName(); + void setElasticUserName(String elasticUserName); + @Description("Elastic Password") + @Default.String("elasticPassword") + String getElasticPassword(); + void setElasticPassword(String elasticPassword); +} http://git-wip-us.apache.org/repos/asf/beam/blob/174436bc/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java new file mode 100644 index 0000000..fe37048 --- /dev/null +++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.hadoop.inputformat.hashing; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * Custom Function for Hashing. The combiner is combineUnordered, and accumulator is a + * HashCode. + */ +public class HashingFn extends CombineFn<String, HashingFn.Accum, String> { + + /** + * Serializable Class to store the HashCode of input String. + */ + public static class Accum implements Serializable { + HashCode hashCode = null; + + public Accum(HashCode value) { + this.hashCode = value; + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + } + } + + @Override + public Accum addInput(Accum accum, String input) { + List<HashCode> elementHashes = Lists.newArrayList(); + if (accum.hashCode != null) { + elementHashes.add(accum.hashCode); + } + HashCode inputHashCode = Hashing.sha1().hashString(input, StandardCharsets.UTF_8); + elementHashes.add(inputHashCode); + accum.hashCode = Hashing.combineUnordered(elementHashes); + return accum; + } + + @Override + public Accum mergeAccumulators(Iterable<Accum> accums) { + Accum merged = createAccumulator(); + List<HashCode> elementHashes = Lists.newArrayList(); + for (Accum accum : accums) { + if (accum.hashCode != null) { + elementHashes.add(accum.hashCode); + } + } + merged.hashCode = Hashing.combineUnordered(elementHashes); + return merged; + } + + @Override + public String extractOutput(Accum accum) { + // Return the combined hash code of list of elements in the Pcollection. + String consolidatedHash = ""; + if (accum.hashCode != null) { + consolidatedHash = accum.hashCode.toString(); + } + return consolidatedHash; + } + + @Override + public Coder<Accum> getAccumulatorCoder(CoderRegistry registry, Coder<String> inputCoder) + throws CannotProvideCoderException { + return SerializableCoder.of(Accum.class); + } + + @Override + public Coder<String> getDefaultOutputCoder(CoderRegistry registry, Coder<String> inputCoder) { + return inputCoder; + } + + @Override + public Accum createAccumulator() { + return new Accum(null); + } +}