Repository: beam
Updated Branches:
  refs/heads/master b261d4890 -> 9e4b140ed


[BEAM-1541] Create hadoop-common and refactor HdfsIO and HBaseIO to use it


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1f6a8af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1f6a8af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1f6a8af

Branch: refs/heads/master
Commit: d1f6a8af12d7db01c45c269551b1e3ec92a8afa0
Parents: b261d48
Author: Ismaël Mejía <[email protected]>
Authored: Thu Feb 23 21:13:19 2017 +0100
Committer: Dan Halperin <[email protected]>
Committed: Mon Feb 27 08:43:34 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |  6 ++
 sdks/java/io/hadoop-common/pom.xml              | 76 ++++++++++++++++
 .../io/hadoop/SerializableConfiguration.java    | 96 ++++++++++++++++++++
 .../apache/beam/sdk/io/hadoop/package-info.java | 22 +++++
 .../hadoop/SerializableConfigurationTest.java   | 75 +++++++++++++++
 sdks/java/io/hbase/pom.xml                      |  5 +
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java   | 20 ++--
 .../hbase/coders/SerializableConfiguration.java | 50 ----------
 sdks/java/io/hdfs/pom.xml                       |  8 +-
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   |  1 +
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java |  1 +
 .../sdk/io/hdfs/SerializableConfiguration.java  | 93 -------------------
 sdks/java/io/pom.xml                            | 32 +++++++
 sdks/java/javadoc/pom.xml                       |  5 +
 14 files changed, 334 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f3dc265..65f6723 100644
--- a/pom.xml
+++ b/pom.xml
@@ -379,6 +379,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-io-hbase</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/pom.xml 
b/sdks/java/io/hadoop-common/pom.xml
new file mode 100644
index 0000000..13e159c
--- /dev/null
+++ b/sdks/java/io/hadoop-common/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop Common</name>
+  <description>Library to add shared Hadoop classes among Beam 
IOs.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
 
b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
new file mode 100644
index 0000000..10b4ed2
--- /dev/null
+++ 
b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * A wrapper to allow Hadoop {@link Configuration}s to be serialized using 
Java's standard
+ * serialization mechanisms.
+ */
+public class SerializableConfiguration implements Externalizable {
+  private static final long serialVersionUID = 0L;
+
+  private transient Configuration conf;
+
+  public SerializableConfiguration() {
+  }
+
+  public SerializableConfiguration(Configuration conf) {
+    if (conf == null) {
+      throw new NullPointerException("Configuration must not be null.");
+    }
+    this.conf = conf;
+  }
+
+  public Configuration get() {
+    return conf;
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    out.writeInt(conf.size());
+    for (Map.Entry<String, String> entry : conf) {
+      out.writeUTF(entry.getKey());
+      out.writeUTF(entry.getValue());
+    }
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+    conf = new Configuration(false);
+    int size = in.readInt();
+    for (int i = 0; i < size; i++) {
+      conf.set(in.readUTF(), in.readUTF());
+    }
+  }
+
+  /**
+   * Returns new configured {@link Job} object.
+   */
+  public static Job newJob(@Nullable SerializableConfiguration conf) throws 
IOException {
+    if (conf == null) {
+      return Job.getInstance();
+    } else {
+      Job job = Job.getInstance();
+      for (Map.Entry<String, String> entry : conf.get()) {
+        job.getConfiguration().set(entry.getKey(), entry.getValue());
+      }
+      return job;
+    }
+  }
+
+  /**
+   * Returns new populated {@link Configuration} object.
+   */
+  public static Configuration newConfiguration(@Nullable 
SerializableConfiguration conf) {
+    if (conf == null) {
+      return new Configuration();
+    } else {
+      return conf.get();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/package-info.java
 
b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/package-info.java
new file mode 100644
index 0000000..8be6128
--- /dev/null
+++ 
b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes shared by Hadoop based IOs.
+ */
+package org.apache.beam.sdk.io.hadoop;

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/SerializableConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/SerializableConfigurationTest.java
 
b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/SerializableConfigurationTest.java
new file mode 100644
index 0000000..b47b40b
--- /dev/null
+++ 
b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/SerializableConfigurationTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for SerializableConfiguration.
+ */
+@RunWith(JUnit4.class)
+public class SerializableConfigurationTest {
+  @Rule public  final ExpectedException thrown = ExpectedException.none();
+  private static final SerializableConfiguration DEFAULT_SERIALIZABLE_CONF =
+          new SerializableConfiguration(new Configuration());
+
+  @Test
+  public void testSerializationDeserialization() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("hadoop.silly.test", "test-value");
+    byte[] object = SerializationUtils.serialize(new 
SerializableConfiguration(conf));
+    SerializableConfiguration serConf = SerializationUtils.deserialize(object);
+    assertNotNull(serConf);
+    assertEquals(serConf.get().get("hadoop.silly.test"), "test-value");
+  }
+
+  @Test
+  public void testConstruction() {
+    assertNotNull(DEFAULT_SERIALIZABLE_CONF);
+    assertNotNull(DEFAULT_SERIALIZABLE_CONF.get());
+    thrown.expect(NullPointerException.class);
+    new SerializableConfiguration(null);
+  }
+
+  @Test
+  public void testCreateNewConfiguration() throws Exception {
+    Configuration confFromNull = 
SerializableConfiguration.newConfiguration(null);
+    assertNotNull(confFromNull);
+    Configuration conf =
+            SerializableConfiguration.newConfiguration(new 
SerializableConfiguration(confFromNull));
+    assertNotNull(conf);
+  }
+
+  @Test
+  public void testCreateNewJob() throws Exception {
+    Job jobFromNull = SerializableConfiguration.newJob(null);
+    assertNotNull(jobFromNull);
+    Job job = SerializableConfiguration.newJob(DEFAULT_SERIALIZABLE_CONF);
+    assertNotNull(job);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml
index 3570316..f4a06a9 100644
--- a/sdks/java/io/hbase/pom.xml
+++ b/sdks/java/io/hbase/pom.xml
@@ -105,6 +105,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 3b5f4da..75f5615 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -39,9 +39,9 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.hbase.coders.HBaseMutationCoder;
 import org.apache.beam.sdk.io.hbase.coders.HBaseResultCoder;
-import org.apache.beam.sdk.io.hbase.coders.SerializableConfiguration;
 import org.apache.beam.sdk.io.hbase.coders.SerializableScan;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
@@ -267,7 +267,7 @@ public class HBaseIO {
                     "Configuration not provided");
             checkArgument(!tableId.isEmpty(), "Table ID not specified");
             try (Connection connection = ConnectionFactory.createConnection(
-                    serializableConfiguration.getConfiguration())) {
+                    serializableConfiguration.get())) {
                 Admin admin = connection.getAdmin();
                 checkArgument(admin.tableExists(TableName.valueOf(tableId)),
                         "Table %s does not exist", tableId);
@@ -280,7 +280,7 @@ public class HBaseIO {
         public void populateDisplayData(DisplayData.Builder builder) {
             super.populateDisplayData(builder);
             builder.add(DisplayData.item("configuration",
-                    serializableConfiguration.getConfiguration().toString()));
+                    serializableConfiguration.get().toString()));
             builder.add(DisplayData.item("tableId", tableId));
             builder.addIfNotNull(DisplayData.item("scan", 
serializableScan.getScan().toString()));
         }
@@ -290,7 +290,7 @@ public class HBaseIO {
         }
 
         public Configuration getConfiguration() {
-            return serializableConfiguration.getConfiguration();
+            return serializableConfiguration.get();
         }
 
         /**
@@ -333,7 +333,7 @@ public class HBaseIO {
         private long estimateSizeBytes() throws Exception {
             // This code is based on RegionSizeCalculator in hbase-server
             long estimatedSizeBytes = 0L;
-            Configuration configuration = 
this.read.serializableConfiguration.getConfiguration();
+            Configuration configuration = 
this.read.serializableConfiguration.get();
             try (Connection connection = 
ConnectionFactory.createConnection(configuration)) {
                 // filter regions for the given table/scan
                 List<HRegionLocation> regionLocations = 
getRegionLocations(connection);
@@ -490,7 +490,7 @@ public class HBaseIO {
 
         @Override
         public boolean start() throws IOException {
-            Configuration configuration = 
source.read.serializableConfiguration.getConfiguration();
+            Configuration configuration = 
source.read.serializableConfiguration.get();
             String tableId = source.read.tableId;
             connection = ConnectionFactory.createConnection(configuration);
             TableName tableName = TableName.valueOf(tableId);
@@ -592,7 +592,7 @@ public class HBaseIO {
             checkArgument(serializableConfiguration != null, "Configuration 
not specified");
             checkArgument(!tableId.isEmpty(), "Table ID not specified");
             try (Connection connection = ConnectionFactory.createConnection(
-                    serializableConfiguration.getConfiguration())) {
+                    serializableConfiguration.get())) {
                 Admin admin = connection.getAdmin();
                 checkArgument(admin.tableExists(TableName.valueOf(tableId)),
                         "Table %s does not exist", tableId);
@@ -605,7 +605,7 @@ public class HBaseIO {
         public void populateDisplayData(DisplayData.Builder builder) {
             super.populateDisplayData(builder);
             builder.add(DisplayData.item("configuration",
-                    serializableConfiguration.getConfiguration().toString()));
+                    serializableConfiguration.get().toString()));
             builder.add(DisplayData.item("tableId", tableId));
         }
 
@@ -614,7 +614,7 @@ public class HBaseIO {
         }
 
         public Configuration getConfiguration() {
-            return serializableConfiguration.getConfiguration();
+            return serializableConfiguration.get();
         }
 
         private final String tableId;
@@ -631,7 +631,7 @@ public class HBaseIO {
 
             @Setup
             public void setup() throws Exception {
-                Configuration configuration = 
this.serializableConfiguration.getConfiguration();
+                Configuration configuration = 
this.serializableConfiguration.get();
                 connection = ConnectionFactory.createConnection(configuration);
 
                 TableName tableName = TableName.valueOf(tableId);

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java
 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java
deleted file mode 100644
index de479de..0000000
--- 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hbase.coders;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * This is just a wrapper class to serialize Hadoop/HBase {@link 
Configuration}.
- */
-public class SerializableConfiguration implements Serializable {
-    private transient Configuration conf;
-
-    public SerializableConfiguration(Configuration conf) {
-        this.conf = conf;
-    }
-
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.defaultWriteObject();
-        conf.write(out);
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException {
-        conf = new Configuration(false);
-        conf.readFields(in);
-    }
-
-    public Configuration getConfiguration() {
-        return conf;
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index f857a22..1212b0e 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -89,6 +89,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
     </dependency>
@@ -142,21 +147,18 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
-      <version>2.7.0</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
-      <version>2.7.0</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <version>2.7.0</version>
       <scope>provided</scope>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index 168bac7..0118249 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 8e12561..2a731fb 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.CoderUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
deleted file mode 100644
index 0772e57..0000000
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- * A wrapper to allow Hadoop {@link Configuration}s to be serialized using 
Java's standard
- * serialization mechanisms.
- */
-public class SerializableConfiguration implements Externalizable {
-  private static final long serialVersionUID = 0L;
-
-  private Configuration conf;
-
-  public SerializableConfiguration() {
-  }
-
-  public SerializableConfiguration(Configuration conf) {
-    this.conf = conf;
-  }
-
-  public Configuration get() {
-    return conf;
-  }
-
-  @Override
-  public void writeExternal(ObjectOutput out) throws IOException {
-    out.writeInt(conf.size());
-    for (Map.Entry<String, String> entry : conf) {
-      out.writeUTF(entry.getKey());
-      out.writeUTF(entry.getValue());
-    }
-  }
-
-  @Override
-  public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-    conf = new Configuration(false);
-    int size = in.readInt();
-    for (int i = 0; i < size; i++) {
-      conf.set(in.readUTF(), in.readUTF());
-    }
-  }
-
-  /**
-   * Returns new configured {@link Job} object.
-   */
-  public static Job newJob(@Nullable SerializableConfiguration conf) throws 
IOException {
-    if (conf == null) {
-      return Job.getInstance();
-    } else {
-      Job job = Job.getInstance();
-      for (Map.Entry<String, String> entry : conf.get()) {
-        job.getConfiguration().set(entry.getKey(), entry.getValue());
-      }
-      return job;
-    }
-  }
-
-  /**
-   * Returns new populated {@link Configuration} object.
-   */
-  public static Configuration newConfiguration(@Nullable 
SerializableConfiguration conf) {
-    if (conf == null) {
-      return new Configuration();
-    } else {
-      return conf.get();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index b518b70..082f26b 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -32,9 +32,41 @@
   <description>Beam SDK Java IO provides different connectivity components
   (sources and sinks) to consume and produce data from systems.</description>
 
+  <properties>
+    <!--
+      This is the version of Hadoop used to compile the hadoop-common module.
+      This dependency is defined with a provided scope.
+      Users must supply their own Hadoop version at runtime.
+    -->
+    <hadoop.version>2.7.3</hadoop.version>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-client</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-common</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-mapreduce-client-core</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <modules>
     <module>elasticsearch</module>
     <module>google-cloud-platform</module>
+    <module>hadoop-common</module>
     <module>hbase</module>
     <module>hdfs</module>
     <module>jdbc</module>

http://git-wip-us.apache.org/repos/asf/beam/blob/d1f6a8af/sdks/java/javadoc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml
index 3f5e8cc..145dcf0 100644
--- a/sdks/java/javadoc/pom.xml
+++ b/sdks/java/javadoc/pom.xml
@@ -114,6 +114,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-io-hbase</artifactId>
     </dependency>
 

Reply via email to