Repository: beam Updated Branches: refs/heads/master b261d4890 -> 9e4b140ed
[BEAM-1541] Create hadoop-common and refactor HdfsIO and HBaseIO to use it Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1f6a8af Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1f6a8af Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1f6a8af Branch: refs/heads/master Commit: d1f6a8af12d7db01c45c269551b1e3ec92a8afa0 Parents: b261d48 Author: Ismaël MejÃa <[email protected]> Authored: Thu Feb 23 21:13:19 2017 +0100 Committer: Dan Halperin <[email protected]> Committed: Mon Feb 27 08:43:34 2017 -0800 ---------------------------------------------------------------------- pom.xml | 6 ++ sdks/java/io/hadoop-common/pom.xml | 76 ++++++++++++++++ .../io/hadoop/SerializableConfiguration.java | 96 ++++++++++++++++++++ .../apache/beam/sdk/io/hadoop/package-info.java | 22 +++++ .../hadoop/SerializableConfigurationTest.java | 75 +++++++++++++++ sdks/java/io/hbase/pom.xml | 5 + .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 20 ++-- .../hbase/coders/SerializableConfiguration.java | 50 ---------- sdks/java/io/hdfs/pom.xml | 8 +- .../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 1 + .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 1 + .../sdk/io/hdfs/SerializableConfiguration.java | 93 ------------------- sdks/java/io/pom.xml | 32 +++++++ sdks/java/javadoc/pom.xml | 5 + 14 files changed, 334 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f3dc265..65f6723 100644 --- a/pom.xml +++ b/pom.xml @@ -379,6 +379,12 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hbase</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hadoop-common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml new file mode 100644 index 0000000..13e159c --- /dev/null +++ b/sdks/java/io/hadoop-common/pom.xml @@ -0,0 +1,76 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>0.6.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-hadoop-common</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop Common</name> + <description>Library to add shared Hadoop classes among Beam IOs.</description> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java new file mode 100644 index 0000000..10b4ed2 --- /dev/null +++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; + +/** + * A wrapper to allow Hadoop {@link Configuration}s to be serialized using Java's standard + * serialization mechanisms. + */ +public class SerializableConfiguration implements Externalizable { + private static final long serialVersionUID = 0L; + + private transient Configuration conf; + + public SerializableConfiguration() { + } + + public SerializableConfiguration(Configuration conf) { + if (conf == null) { + throw new NullPointerException("Configuration must not be null."); + } + this.conf = conf; + } + + public Configuration get() { + return conf; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(conf.size()); + for (Map.Entry<String, String> entry : conf) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + conf = new Configuration(false); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + conf.set(in.readUTF(), in.readUTF()); + } + } + + /** + * Returns new configured {@link Job} object. + */ + public static Job newJob(@Nullable SerializableConfiguration conf) throws IOException { + if (conf == null) { + return Job.getInstance(); + } else { + Job job = Job.getInstance(); + for (Map.Entry<String, String> entry : conf.get()) { + job.getConfiguration().set(entry.getKey(), entry.getValue()); + } + return job; + } + } + + /** + * Returns new populated {@link Configuration} object. + */ + public static Configuration newConfiguration(@Nullable SerializableConfiguration conf) { + if (conf == null) { + return new Configuration(); + } else { + return conf.get(); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/package-info.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/package-info.java new file mode 100644 index 0000000..8be6128 --- /dev/null +++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes shared by Hadoop based IOs. + */ +package org.apache.beam.sdk.io.hadoop; http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/SerializableConfigurationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/SerializableConfigurationTest.java b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/SerializableConfigurationTest.java new file mode 100644 index 0000000..b47b40b --- /dev/null +++ b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/SerializableConfigurationTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for SerializableConfiguration. + */ +@RunWith(JUnit4.class) +public class SerializableConfigurationTest { + @Rule public final ExpectedException thrown = ExpectedException.none(); + private static final SerializableConfiguration DEFAULT_SERIALIZABLE_CONF = + new SerializableConfiguration(new Configuration()); + + @Test + public void testSerializationDeserialization() throws Exception { + Configuration conf = new Configuration(); + conf.set("hadoop.silly.test", "test-value"); + byte[] object = SerializationUtils.serialize(new SerializableConfiguration(conf)); + SerializableConfiguration serConf = SerializationUtils.deserialize(object); + assertNotNull(serConf); + assertEquals(serConf.get().get("hadoop.silly.test"), "test-value"); + } + + @Test + public void testConstruction() { + assertNotNull(DEFAULT_SERIALIZABLE_CONF); + assertNotNull(DEFAULT_SERIALIZABLE_CONF.get()); + thrown.expect(NullPointerException.class); + new SerializableConfiguration(null); + } + + @Test + public void testCreateNewConfiguration() throws Exception { + Configuration confFromNull = SerializableConfiguration.newConfiguration(null); + assertNotNull(confFromNull); + Configuration conf = + SerializableConfiguration.newConfiguration(new SerializableConfiguration(confFromNull)); + assertNotNull(conf); + } + + @Test + public void testCreateNewJob() throws Exception { + Job jobFromNull = SerializableConfiguration.newJob(null); + assertNotNull(jobFromNull); + Job job = SerializableConfiguration.newJob(DEFAULT_SERIALIZABLE_CONF); + assertNotNull(job); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hbase/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml index 3570316..f4a06a9 100644 --- a/sdks/java/io/hbase/pom.xml +++ b/sdks/java/io/hbase/pom.xml @@ -105,6 +105,11 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-common</artifactId> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 3b5f4da..75f5615 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -39,9 +39,9 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; import org.apache.beam.sdk.io.hbase.coders.HBaseMutationCoder; import org.apache.beam.sdk.io.hbase.coders.HBaseResultCoder; -import org.apache.beam.sdk.io.hbase.coders.SerializableConfiguration; import org.apache.beam.sdk.io.hbase.coders.SerializableScan; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; @@ -267,7 +267,7 @@ public class HBaseIO { "Configuration not provided"); checkArgument(!tableId.isEmpty(), "Table ID not specified"); try (Connection connection = ConnectionFactory.createConnection( - serializableConfiguration.getConfiguration())) { + serializableConfiguration.get())) { Admin admin = connection.getAdmin(); checkArgument(admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId); @@ -280,7 +280,7 @@ public class HBaseIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("configuration", - serializableConfiguration.getConfiguration().toString())); + serializableConfiguration.get().toString())); builder.add(DisplayData.item("tableId", tableId)); builder.addIfNotNull(DisplayData.item("scan", serializableScan.getScan().toString())); } @@ -290,7 +290,7 @@ public class HBaseIO { } public Configuration getConfiguration() { - return serializableConfiguration.getConfiguration(); + return serializableConfiguration.get(); } /** @@ -333,7 +333,7 @@ public class HBaseIO { private long estimateSizeBytes() throws Exception { // This code is based on RegionSizeCalculator in hbase-server long estimatedSizeBytes = 0L; - Configuration configuration = this.read.serializableConfiguration.getConfiguration(); + Configuration configuration = this.read.serializableConfiguration.get(); try (Connection connection = ConnectionFactory.createConnection(configuration)) { // filter regions for the given table/scan List<HRegionLocation> regionLocations = getRegionLocations(connection); @@ -490,7 +490,7 @@ public class HBaseIO { @Override public boolean start() throws IOException { - Configuration configuration = source.read.serializableConfiguration.getConfiguration(); + Configuration configuration = source.read.serializableConfiguration.get(); String tableId = source.read.tableId; connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf(tableId); @@ -592,7 +592,7 @@ public class HBaseIO { checkArgument(serializableConfiguration != null, "Configuration not specified"); checkArgument(!tableId.isEmpty(), "Table ID not specified"); try (Connection connection = ConnectionFactory.createConnection( - serializableConfiguration.getConfiguration())) { + serializableConfiguration.get())) { Admin admin = connection.getAdmin(); checkArgument(admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId); @@ -605,7 +605,7 @@ public class HBaseIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("configuration", - serializableConfiguration.getConfiguration().toString())); + serializableConfiguration.get().toString())); builder.add(DisplayData.item("tableId", tableId)); } @@ -614,7 +614,7 @@ public class HBaseIO { } public Configuration getConfiguration() { - return serializableConfiguration.getConfiguration(); + return serializableConfiguration.get(); } private final String tableId; @@ -631,7 +631,7 @@ public class HBaseIO { @Setup public void setup() throws Exception { - Configuration configuration = this.serializableConfiguration.getConfiguration(); + Configuration configuration = this.serializableConfiguration.get(); connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf(tableId); http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java deleted file mode 100644 index de479de..0000000 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hbase.coders; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.apache.hadoop.conf.Configuration; - -/** - * This is just a wrapper class to serialize Hadoop/HBase {@link Configuration}. - */ -public class SerializableConfiguration implements Serializable { - private transient Configuration conf; - - public SerializableConfiguration(Configuration conf) { - this.conf = conf; - } - - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - conf.write(out); - } - - private void readObject(ObjectInputStream in) throws IOException { - conf = new Configuration(false); - conf.readFields(in); - } - - public Configuration getConfiguration() { - return conf; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index f857a22..1212b0e 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -89,6 +89,11 @@ </dependency> <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-common</artifactId> + </dependency> + + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </dependency> @@ -142,21 +147,18 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> - <version>2.7.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>2.7.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>2.7.0</version> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java index 168bac7..0118249 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Sink; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index 8e12561..2a731fb 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.CoderUtils; http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java deleted file mode 100644 index 0772e57..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Job; - -/** - * A wrapper to allow Hadoop {@link Configuration}s to be serialized using Java's standard - * serialization mechanisms. - */ -public class SerializableConfiguration implements Externalizable { - private static final long serialVersionUID = 0L; - - private Configuration conf; - - public SerializableConfiguration() { - } - - public SerializableConfiguration(Configuration conf) { - this.conf = conf; - } - - public Configuration get() { - return conf; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(conf.size()); - for (Map.Entry<String, String> entry : conf) { - out.writeUTF(entry.getKey()); - out.writeUTF(entry.getValue()); - } - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - conf = new Configuration(false); - int size = in.readInt(); - for (int i = 0; i < size; i++) { - conf.set(in.readUTF(), in.readUTF()); - } - } - - /** - * Returns new configured {@link Job} object. - */ - public static Job newJob(@Nullable SerializableConfiguration conf) throws IOException { - if (conf == null) { - return Job.getInstance(); - } else { - Job job = Job.getInstance(); - for (Map.Entry<String, String> entry : conf.get()) { - job.getConfiguration().set(entry.getKey(), entry.getValue()); - } - return job; - } - } - - /** - * Returns new populated {@link Configuration} object. - */ - public static Configuration newConfiguration(@Nullable SerializableConfiguration conf) { - if (conf == null) { - return new Configuration(); - } else { - return conf.get(); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index b518b70..082f26b 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -32,9 +32,41 @@ <description>Beam SDK Java IO provides different connectivity components (sources and sinks) to consume and produce data from systems.</description> + <properties> + <!-- + This is the version of Hadoop used to compile the hadoop-common module. + This dependency is defined with a provided scope. + Users must supply their own Hadoop version at runtime. + --> + <hadoop.version>2.7.3</hadoop.version> + </properties> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <modules> <module>elasticsearch</module> <module>google-cloud-platform</module> + <module>hadoop-common</module> <module>hbase</module> <module>hdfs</module> <module>jdbc</module> http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/javadoc/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml index 3f5e8cc..145dcf0 100644 --- a/sdks/java/javadoc/pom.xml +++ b/sdks/java/javadoc/pom.xml @@ -114,6 +114,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hadoop-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hbase</artifactId> </dependency>
