Repository: beam Updated Branches: refs/heads/master a3e7383cc -> 7478da997
[BEAM-2031] Add support to pass through zero or more Hadoop configurations through as PipelineOptions Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/864e2ad2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/864e2ad2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/864e2ad2 Branch: refs/heads/master Commit: 864e2ad26e3748b414d10c53c28e5e18fb4a53e7 Parents: a3e7383 Author: Luke Cwik <[email protected]> Authored: Fri Apr 28 10:30:24 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Fri Apr 28 16:14:34 2017 -0700 ---------------------------------------------------------------------- sdks/java/io/hdfs/pom.xml | 10 +++ .../sdk/io/hdfs/HadoopFileSystemModule.java | 84 ++++++++++++++++++++ .../sdk/io/hdfs/HadoopFileSystemOptions.java | 49 ++++++++++++ .../hdfs/HadoopFileSystemOptionsRegistrar.java | 35 ++++++++ .../sdk/io/hdfs/HadoopFileSystemModuleTest.java | 65 +++++++++++++++ .../HadoopFileSystemOptionsRegistrarTest.java | 49 ++++++++++++ .../io/hdfs/HadoopFileSystemOptionsTest.java | 48 +++++++++++ 7 files changed, 340 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index 91c2cf7..46cf8cf 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -56,6 +56,16 @@ </dependency> <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> <groupId>com.google.auto.service</groupId> <artifactId>auto-service</artifactId> <optional>true</optional> http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java new file mode 100644 index 0000000..2cb9d8a --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; + +/** + * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer} + * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map. + * + * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their + * values dropping any configuration hierarchy and source information. + */ +@AutoService(Module.class) +public class HadoopFileSystemModule extends SimpleModule { + public HadoopFileSystemModule() { + super("HadoopFileSystemModule"); + setMixInAnnotation(Configuration.class, ConfigurationMixin.class); + } + + /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */ + @JsonDeserialize(using = ConfigurationDeserializer.class) + @JsonSerialize(using = ConfigurationSerializer.class) + private static class ConfigurationMixin {} + + /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */ + static class ConfigurationDeserializer extends JsonDeserializer<Configuration> { + @Override + public Configuration deserialize(JsonParser jsonParser, + DeserializationContext deserializationContext) throws IOException, JsonProcessingException { + Map<String, String> rawConfiguration = + jsonParser.readValueAs(new TypeReference<Map<String, String>>() {}); + Configuration configuration = new Configuration(false); + for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + return configuration; + } + } + + /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */ + static class ConfigurationSerializer extends JsonSerializer<Configuration> { + @Override + public void serialize(Configuration configuration, JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + Map<String, String> map = new TreeMap<>(); + for (Map.Entry<String, String> entry : configuration) { + map.put(entry.getKey(), entry.getValue()); + } + jsonGenerator.writeObject(map); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java new file mode 100644 index 0000000..31250bc --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import java.util.List; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.hadoop.conf.Configuration; + +/** + * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration} + * for the {@link HadoopFileSystem}. + */ +public interface HadoopFileSystemOptions extends PipelineOptions { + @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. " + + "To specify on the command-line, represent the value as a JSON list of JSON maps, where " + + "each map represents the entire configuration for a single Hadoop filesystem. For example " + + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...}," + + "{\"fs.default.name\": \"s3a://\", ...},...]'") + @Default.InstanceFactory(ConfigurationLocator.class) + List<Configuration> getHdfsConfiguration(); + void setHdfsConfiguration(List<Configuration> value); + + /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */ + class ConfigurationLocator implements DefaultValueFactory<Configuration> { + @Override + public Configuration create(PipelineOptions options) { + // TODO: Find default configuration to use + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java new file mode 100644 index 0000000..344623b --- /dev/null +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; + +/** + * {@link AutoService} registrar for {@link HadoopFileSystemOptions}. + */ +@AutoService(PipelineOptionsRegistrar.class) +public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar { + + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java new file mode 100644 index 0000000..6963116 --- /dev/null +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.hadoop.conf.Configuration; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test for {@link HadoopFileSystemModule}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemModuleTest { + @Test + public void testObjectMapperIsAbleToFindModule() throws Exception { + List<Module> modules = ObjectMapper.findModules(ReflectHelpers.findClassLoader()); + assertThat(modules, hasItem(Matchers.<Module>instanceOf(HadoopFileSystemModule.class))); + } + + @Test + public void testConfigurationSerializationDeserialization() throws Exception { + Configuration baseConfiguration = new Configuration(false); + baseConfiguration.set("testPropertyA", "baseA"); + baseConfiguration.set("testPropertyC", "baseC"); + Configuration configuration = new Configuration(false); + configuration.addResource(baseConfiguration); + configuration.set("testPropertyA", "A"); + configuration.set("testPropertyB", "B"); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new HadoopFileSystemModule()); + String serializedConfiguration = objectMapper.writeValueAsString(configuration); + Configuration deserializedConfiguration = + objectMapper.readValue(serializedConfiguration, Configuration.class); + assertThat(deserializedConfiguration, Matchers.<Map.Entry<String, String>>contains( + new AbstractMap.SimpleEntry("testPropertyA", "A"), + new AbstractMap.SimpleEntry("testPropertyB", "B"), + new AbstractMap.SimpleEntry("testPropertyC", "baseC"))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java new file mode 100644 index 0000000..2be3d93 --- /dev/null +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ServiceLoader; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HadoopFileSystemOptionsRegistrar}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemOptionsRegistrarTest { + + @Test + public void testServiceLoader() { + for (PipelineOptionsRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { + if (registrar instanceof HadoopFileSystemOptionsRegistrar) { + assertThat(registrar.getPipelineOptions(), + Matchers.<Class<?>>contains(HadoopFileSystemOptions.class)); + return; + } + } + fail("Expected to find " + HadoopFileSystemOptionsRegistrar.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/864e2ad2/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java new file mode 100644 index 0000000..634528b --- /dev/null +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.util.AbstractMap; +import java.util.Map; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HadoopFileSystemOptions}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemOptionsTest { + @Test + public void testParsingHdfsConfiguration() { + HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs( + "--hdfsConfiguration=[" + + "{\"propertyA\": \"A\"}," + + "{\"propertyB\": \"B\"}]").as(HadoopFileSystemOptions.class); + assertEquals(2, options.getHdfsConfiguration().size()); + assertThat(options.getHdfsConfiguration().get(0), Matchers.<Map.Entry<String, String>>contains( + new AbstractMap.SimpleEntry("propertyA", "A"))); + assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains( + new AbstractMap.SimpleEntry("propertyB", "B"))); + } +}
