Repository: beam Updated Branches: refs/heads/master 84a96297c -> c46b256d7
Move HashingFn to io/common, switch to better hash Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b615013b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b615013b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b615013b Branch: refs/heads/master Commit: b615013b9c941038d3e9fd96a153f0894f52f183 Parents: 84a9629 Author: Stephen Sisk <[email protected]> Authored: Fri Apr 7 12:59:28 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue Apr 11 08:15:49 2017 -0700 ---------------------------------------------------------------------- sdks/java/io/common/pom.xml | 4 + .../apache/beam/sdk/io/common/HashingFn.java | 109 +++++++++++++++++++ sdks/java/io/hadoop/jdk1.8-tests/pom.xml | 46 +------- .../inputformat/HIFIOWithElasticTest.java | 6 +- .../hadoop/inputformat/hashing/HashingFn.java | 109 ------------------- .../integration/tests/HIFIOCassandraIT.java | 2 +- .../integration/tests/HIFIOElasticIT.java | 2 +- 7 files changed, 124 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml index fa51b47..3f6d79d 100644 --- a/sdks/java/io/common/pom.xml +++ b/sdks/java/io/common/pom.xml @@ -34,5 +34,9 @@ <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java new file mode 100644 index 0000000..d534c87 --- /dev/null +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.beam.sdk.io.common; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * Custom Function for Hashing. The combiner is combineUnordered, and accumulator is a + * HashCode. + */ +public class HashingFn extends CombineFn<String, HashingFn.Accum, String> { + + /** + * Serializable Class to store the HashCode of input String. + */ + public static class Accum implements Serializable { + HashCode hashCode = null; + + public Accum(HashCode value) { + this.hashCode = value; + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + } + } + + @Override + public Accum addInput(Accum accum, String input) { + List<HashCode> elementHashes = Lists.newArrayList(); + if (accum.hashCode != null) { + elementHashes.add(accum.hashCode); + } + HashCode inputHashCode = Hashing.murmur3_128().hashString(input, StandardCharsets.UTF_8); + elementHashes.add(inputHashCode); + accum.hashCode = Hashing.combineUnordered(elementHashes); + return accum; + } + + @Override + public Accum mergeAccumulators(Iterable<Accum> accums) { + Accum merged = createAccumulator(); + List<HashCode> elementHashes = Lists.newArrayList(); + for (Accum accum : accums) { + if (accum.hashCode != null) { + elementHashes.add(accum.hashCode); + } + } + merged.hashCode = Hashing.combineUnordered(elementHashes); + return merged; + } + + @Override + public String extractOutput(Accum accum) { + // Return the combined hash code of list of elements in the Pcollection. + String consolidatedHash = ""; + if (accum.hashCode != null) { + consolidatedHash = accum.hashCode.toString(); + } + return consolidatedHash; + } + + @Override + public Coder<Accum> getAccumulatorCoder(CoderRegistry registry, Coder<String> inputCoder) + throws CannotProvideCoderException { + return SerializableCoder.of(Accum.class); + } + + @Override + public Coder<String> getDefaultOutputCoder(CoderRegistry registry, Coder<String> inputCoder) { + return inputCoder; + } + + @Override + public Accum createAccumulator() { + return new Accum(null); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml index 4c510ae..84b923a 100644 --- a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml +++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml @@ -36,41 +36,6 @@ <build> <plugins> - <plugin> - <!-- Guava shading is required as Cassandra tests require version - 19 of Guava, by default project wide Guava shading may not suffice as it - loads a different version of guava which will not work for Cassandra tests --> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <artifactSet> - <includes> - <include>com.google.guava:guava:19.0</include> - </includes> - </artifactSet> - <relocations> - <relocation> - <pattern>com.google.common</pattern> - <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.common</shadedPattern> - </relocation> - <relocation> - <pattern>com.google.thirdparty</pattern> - <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.thirdparty</shadedPattern> - </relocation> - </relocations> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - </transformers> - </configuration> - </execution> - </executions> - </plugin> <!-- Overridden enforcer plugin for JDK1.8 for running tests --> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -178,11 +143,6 @@ <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId> </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -208,6 +168,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.6.2</version> http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java index 599a4a1..51cbd5a 100644 --- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java +++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java @@ -25,7 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn; +import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; @@ -105,7 +105,7 @@ public class HIFIOWithElasticTest implements Serializable { @Test public void testHifIOWithElastic() { // Expected hashcode is evaluated during insertion time one time and hardcoded here. - String expectedHashCode = "e2098f431f90193aa4545e033e6fd2217aafe7b6"; + String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc"; Configuration conf = getConfiguration(); PCollection<KV<Text, LinkedMapWritable>> esData = pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf)); @@ -135,7 +135,7 @@ public class HIFIOWithElasticTest implements Serializable { @Test public void testHifIOWithElasticQuery() { long expectedRowCount = 1L; - String expectedHashCode = "caa37dbd8258e3a7f98932958c819a57aab044ec"; + String expectedHashCode = "cfbf3e5c993d44e57535a114e25f782d"; Configuration conf = getConfiguration(); String fieldValue = ELASTIC_TYPE_ID_PREFIX + "2"; String query = "{" http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java deleted file mode 100644 index fe37048..0000000 --- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.beam.sdk.io.hadoop.inputformat.hashing; - -import com.google.common.collect.Lists; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.List; - -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * Custom Function for Hashing. The combiner is combineUnordered, and accumulator is a - * HashCode. - */ -public class HashingFn extends CombineFn<String, HashingFn.Accum, String> { - - /** - * Serializable Class to store the HashCode of input String. - */ - public static class Accum implements Serializable { - HashCode hashCode = null; - - public Accum(HashCode value) { - this.hashCode = value; - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - } - - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - } - } - - @Override - public Accum addInput(Accum accum, String input) { - List<HashCode> elementHashes = Lists.newArrayList(); - if (accum.hashCode != null) { - elementHashes.add(accum.hashCode); - } - HashCode inputHashCode = Hashing.sha1().hashString(input, StandardCharsets.UTF_8); - elementHashes.add(inputHashCode); - accum.hashCode = Hashing.combineUnordered(elementHashes); - return accum; - } - - @Override - public Accum mergeAccumulators(Iterable<Accum> accums) { - Accum merged = createAccumulator(); - List<HashCode> elementHashes = Lists.newArrayList(); - for (Accum accum : accums) { - if (accum.hashCode != null) { - elementHashes.add(accum.hashCode); - } - } - merged.hashCode = Hashing.combineUnordered(elementHashes); - return merged; - } - - @Override - public String extractOutput(Accum accum) { - // Return the combined hash code of list of elements in the Pcollection. - String consolidatedHash = ""; - if (accum.hashCode != null) { - consolidatedHash = accum.hashCode.toString(); - } - return consolidatedHash; - } - - @Override - public Coder<Accum> getAccumulatorCoder(CoderRegistry registry, Coder<String> inputCoder) - throws CannotProvideCoderException { - return SerializableCoder.of(Accum.class); - } - - @Override - public Coder<String> getDefaultOutputCoder(CoderRegistry registry, Coder<String> inputCoder) { - return inputCoder; - } - - @Override - public Accum createAccumulator() { - return new Accum(null); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java index bf9a5fd..bf4cb92 100644 --- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java +++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java @@ -21,9 +21,9 @@ import com.datastax.driver.core.Row; import java.io.Serializable; +import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO; import org.apache.beam.sdk.io.hadoop.inputformat.custom.options.HIFTestOptions; -import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java index 13c0cbc..65ef8f2 100644 --- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java +++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java @@ -17,9 +17,9 @@ package org.apache.beam.sdk.io.hadoop.inputformat.integration.tests; import java.io.IOException; import java.io.Serializable; +import org.apache.beam.sdk.io.common.HashingFn; import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO; import org.apache.beam.sdk.io.hadoop.inputformat.custom.options.HIFTestOptions; -import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline;
