[
https://issues.apache.org/jira/browse/BEAM-3702?focusedWorklogId=140691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140691
]
ASF GitHub Bot logged work on BEAM-3702:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Sep/18 18:14
Start Date: 03/Sep/18 18:14
Worklog Time Spent: 10m
Work Description: stale[bot] closed pull request #4683: [BEAM-3702]
adding fromJvm to create pipelineoptions from the system properties
URL: https://github.com/apache/beam/pull/4683
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index 7630fb3e167..3e724c04b18 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -20,12 +20,16 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Locale.ROOT;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
@@ -38,9 +42,12 @@
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
+import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
import com.google.common.collect.RowSortedTable;
import com.google.common.collect.Sets;
@@ -63,11 +70,13 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedMap;
@@ -75,9 +84,10 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import javax.annotation.Nonnull;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.Validation.Required;
@@ -174,6 +184,13 @@ public static Builder fromArgs(String... args) {
return new Builder().fromArgs(args);
}
+ /**
+ * @return a builder instance enabling you to build pipeline options.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
/**
* After creation we will validate that {@code <T>} conforms to all the
* validation criteria. See
@@ -187,20 +204,20 @@ public Builder withValidation() {
/** A fluent {@link PipelineOptions} builder. */
public static class Builder {
private final String defaultAppName;
- private final String[] args;
+ private final Collection<Function<Builder, ListMultimap<String, String>>>
options;
private final boolean validation;
private final boolean strictParsing;
private final boolean isCli;
// Do not allow direct instantiation
private Builder() {
- this(null, false, true, false);
+ this(new ArrayList<>(), false, true, false);
}
- private Builder(String[] args, boolean validation,
- boolean strictParsing, boolean isCli) {
+ private Builder(Collection<Function<Builder, ListMultimap<String,
String>>> options,
+ boolean validation, boolean strictParsing, boolean isCli) {
this.defaultAppName = findCallersClassName();
- this.args = args;
+ this.options = options;
this.validation = validation;
this.strictParsing = strictParsing;
this.isCli = isCli;
@@ -242,10 +259,36 @@ private Builder(String[] args, boolean validation,
* {@code --help=PipelineOptionsClassName} will print out detailed usage
information about the
* specifically requested PipelineOptions by invoking
* {@link PipelineOptionsFactory#printHelp(PrintStream, Class)}.
+ *
+ * <p>Important point to note is that since the release 2.5.0, calling
twice this method will
+ * <b>append</b> the arguments instead of replacing it.
*/
public Builder fromArgs(String... args) {
checkNotNull(args, "Arguments should not be null.");
- return new Builder(args, validation, strictParsing, true);
+ options.add(current -> parseCommandLine(args, current.strictParsing));
+ return new Builder(options, validation, strictParsing, true);
+ }
+
+ /**
+ * <p>A key/value set of options. This reuses the same logic than
+ * {@link PipelineOptionsFactory#fromArgs(String...)} but using
+ * the configuration parameter as input. Keys don't have <pre>--</pre>
+ * as a prefix in this case.</p>
+ *
+ * <p>Simple list style properties are able to be bound to {@code
boolean[]}, {@code char[]},
+ * {@code short[]}, {@code int[]}, {@code long[]}, {@code float[]}, {@code
double[]},
+ * {@code Class[]}, enum arrays, {@code String[]}, and {@code
List<String>}.
+ *
+ * <p>For more information about the argument handling please see
+ * {@link Builder#fromArgs(String...)}.
+ *
+ * @param configuration the options to convert to PipelineOptions.
+ * @return a new builder for pipeline options.
+ */
+ public Builder fromMap(final Map<String, String> configuration) {
+ checkNotNull(configuration, "Arguments should not be null.");
+ options.add(builder ->
LinkedListMultimap.create(Multimaps.forMap(configuration)));
+ return new Builder(options, validation, strictParsing, true);
}
/**
@@ -255,7 +298,7 @@ public Builder fromArgs(String... args) {
* validation.
*/
public Builder withValidation() {
- return new Builder(args, true, strictParsing, isCli);
+ return new Builder(options, true, strictParsing, isCli);
}
/**
@@ -263,7 +306,7 @@ public Builder withValidation() {
* arguments.
*/
public Builder withoutStrictParsing() {
- return new Builder(args, validation, false, isCli);
+ return new Builder(options, validation, false, isCli);
}
/**
@@ -289,12 +332,13 @@ public PipelineOptions create() {
public <T extends PipelineOptions> T as(Class<T> klass) {
Map<String, Object> initialOptions = Maps.newHashMap();
- // Attempt to parse the arguments into the set of initial options to use
- if (args != null) {
- ListMultimap<String, String> options = parseCommandLine(args,
strictParsing);
+ if (!options.isEmpty()) {
LOG.debug("Provided Arguments: {}", options);
- printHelpUsageAndExitIfNeeded(options, System.out, true /* exit */);
- initialOptions = parseObjects(klass, options, strictParsing);
+ final ListMultimap<String, String> optionsInstance = options.stream()
+ .map(fn -> fn.apply(this))
+ .collect(LinkedListMultimap::create, Multimap::putAll,
Multimap::putAll);
+ printHelpUsageAndExitIfNeeded(optionsInstance, System.out, true /*
exit */);
+ initialOptions = parseObjects(klass, optionsInstance, strictParsing);
}
// Create our proxy
@@ -333,7 +377,7 @@ public PipelineOptions create() {
* {@code printStream} and {@code exit} used for testing.
*/
@SuppressWarnings("unchecked")
- static boolean printHelpUsageAndExitIfNeeded(ListMultimap<String, String>
options,
+ static boolean printHelpUsageAndExitIfNeeded(Multimap<String, String>
options,
PrintStream printStream, boolean exit) {
if (options.containsKey("help")) {
final String helpOption = Iterables.getOnlyElement(options.get("help"));
@@ -903,10 +947,9 @@ private static void validateReturnType(Class<? extends
PipelineOptions> iface) {
List<MultipleDefinitions> multipleDefinitions = Lists.newArrayList();
for (Map.Entry<Method, Collection<Method>> entry
: methodNameToMethodMap.asMap().entrySet()) {
- Set<Class<?>> returnTypes = FluentIterable.from(entry.getValue())
- .transform(ReturnTypeFetchingFunction.INSTANCE).toSet();
- SortedSet<Method> collidingMethods =
FluentIterable.from(entry.getValue())
- .toSortedSet(MethodComparator.INSTANCE);
+ Set<Class<?>> returnTypes = entry.getValue().stream()
+ .map(ReturnTypeFetchingFunction.INSTANCE::apply).collect(toSet());
+ SortedSet<Method> collidingMethods =
SortedSet.class.cast(entry.getValue());
if (returnTypes.size() > 1) {
MultipleDefinitions defs = new MultipleDefinitions();
defs.method = entry.getKey();
@@ -972,52 +1015,34 @@ private static void
validateGettersHaveConsistentAnnotation(
SortedSet<Method> getters =
methodNameToAllMethodMap.get(descriptor.getReadMethod());
SortedSet<Method> gettersWithTheAnnotation =
Sets.filter(getters, annotationPredicates.forMethod);
- Set<Annotation> distinctAnnotations =
Sets.newLinkedHashSet(FluentIterable
- .from(gettersWithTheAnnotation)
- .transformAndConcat(new Function<Method, Iterable<? extends
Annotation>>() {
- @Nonnull
- @Override
- public Iterable<? extends Annotation> apply(@Nonnull Method
method) {
- return FluentIterable.from(method.getAnnotations());
- }
- })
- .filter(annotationPredicates.forAnnotation));
+ Set<Annotation> distinctAnnotations = gettersWithTheAnnotation.stream()
+ .flatMap(m -> Stream.of(m.getAnnotations()))
+ .filter(annotationPredicates.forAnnotation::apply)
+ .collect(toSet());
if (distinctAnnotations.size() > 1) {
throw new IllegalArgumentException(String.format(
"Property [%s] is marked with contradictory annotations. Found
[%s].",
descriptor.getName(),
- FluentIterable.from(gettersWithTheAnnotation)
- .transformAndConcat(new Function<Method, Iterable<String>>() {
- @Nonnull
- @Override
- public Iterable<String> apply(final @Nonnull Method method) {
- return FluentIterable.from(method.getAnnotations())
- .filter(annotationPredicates.forAnnotation)
- .transform(new Function<Annotation, String>() {
- @Nonnull
- @Override
- public String apply(@Nonnull Annotation annotation) {
- return String.format(
- "[%s on %s]",
-
ReflectHelpers.ANNOTATION_FORMATTER.apply(annotation),
-
ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method));
- }
- });
-
- }
- })
- .join(Joiner.on(", "))));
+ gettersWithTheAnnotation.stream()
+ .flatMap(method -> Stream.of(method.getAnnotations())
+ .filter(annotationPredicates.forAnnotation::apply)
+ .map(annotation -> String.format(
+ "[%s on %s]",
+ ReflectHelpers.ANNOTATION_FORMATTER.apply(annotation),
+
ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method))))
+ .collect(joining(", "))));
}
- Iterable<String> getterClassNames = FluentIterable.from(getters)
- .transform(MethodToDeclaringClassFunction.INSTANCE)
- .transform(ReflectHelpers.CLASS_NAME);
- Iterable<String> gettersWithTheAnnotationClassNames =
- FluentIterable.from(gettersWithTheAnnotation)
- .transform(MethodToDeclaringClassFunction.INSTANCE)
- .transform(ReflectHelpers.CLASS_NAME);
+ Iterable<String> getterClassNames = getters.stream()
+ .map(MethodToDeclaringClassFunction.INSTANCE::apply)
+ .map(ReflectHelpers.CLASS_NAME::apply)
+ .collect(toList());
+ Iterable<String> gettersWithTheAnnotationClassNames =
gettersWithTheAnnotation.stream()
+ .map(MethodToDeclaringClassFunction.INSTANCE::apply)
+ .map(ReflectHelpers.CLASS_NAME::apply)
+ .collect(toList());
if (!(gettersWithTheAnnotation.isEmpty()
|| getters.size() == gettersWithTheAnnotation.size())) {
@@ -1049,10 +1074,10 @@ private static void validateSettersDoNotHaveAnnotation(
methodNameToAllMethodMap.get(descriptor.getWriteMethod()),
annotationPredicates.forMethod);
- Iterable<String> settersWithTheAnnotationClassNames =
- FluentIterable.from(settersWithTheAnnotation)
- .transform(MethodToDeclaringClassFunction.INSTANCE)
- .transform(ReflectHelpers.CLASS_NAME);
+ Iterable<String> settersWithTheAnnotationClassNames =
settersWithTheAnnotation.stream()
+ .map(MethodToDeclaringClassFunction.INSTANCE::apply)
+ .map(ReflectHelpers.CLASS_NAME::apply)
+ .collect(toList());
if (!settersWithTheAnnotation.isEmpty()) {
AnnotatedSetter annotated = new AnnotatedSetter();
@@ -1797,4 +1822,112 @@ private synchronized void register(Class<? extends
PipelineOptions> iface) {
return combinedCache.get(interfaces).getPropertyDescriptors();
}
}
+
+ /**
+ * For more information please see {@link
PipelineOptionsFactory#fromMap(Map)}.
+ *
+ * @param prefix a prefix filter on the map keys.
+ * @param options the options to convert to a pipeline options.
+ * @return a pipeline options instance based on the specified configuration.
+ */
+ @Experimental
+ public static PipelineOptions fromMap(final String prefix, final Map<String,
String> options) {
+ checkArgument(prefix != null, "prefix should not be null.");
+ checkArgument(options != null, "options should not be null.");
+ final Map<String, String> filtered = options.entrySet()
+ .stream()
+ .filter(e -> e.getKey()
+ .startsWith(prefix))
+ .collect(toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ return fromMap(filtered);
+ }
+
+ /**
+ * For more information please see {@link
PipelineOptionsFactory.Builder#fromMap(Map)}.
+ *
+ * @param options the options to convert to a pipeline options.
+ * @return a pipeline options instance based on the specified configuration.
+ */
+ public static PipelineOptions fromMap(final Map<String, String> options) {
+ checkArgument(options != null, "options should not be null.");
+ return new Builder().fromMap(options).create();
+ }
+
+ /**
+ * <p>For more information please see {@link
PipelineOptionsFactory#fromMap(Map)}.</p>
+ *
+ * <p>This method uses {@link Properties#stringPropertyNames()} to ensure to
filter
+ * only {@link String} key/values.</p>
+ *
+ * @param properties the properties to use to create the pipeline options
instance.
+ * @return a pipeline options instance based on the specified properties.
+ */
+ @Experimental
+ public static PipelineOptions fromProperties(final Properties properties) {
+ checkArgument(properties != null, "properties should not be null.");
+ return fromMap(properties.stringPropertyNames()
+ .stream()
+ .collect(toMap(identity(), properties::getProperty)));
+ }
+
+ /**
+ * For more information please see {@link
PipelineOptionsFactory#fromMap(String, Map)}
+ * and {@link PipelineOptionsFactory#fromProperties(Properties)}.
+ *
+ * @param prefix the prefix filter applied on system properties keys.
+ * @param properties the properties to use to create the pipeline options
instance.
+ * @return a pipeline options instance based on the specified properties
+ * filtered with the specified prefix.
+ */
+ @Experimental
+ public static PipelineOptions fromProperties(final String prefix, final
Properties properties) {
+ checkArgument(prefix != null, "prefix should not be null.");
+ checkArgument(properties != null, "properties should not be null.");
+ final Properties instantiationProperties = properties.stringPropertyNames()
+ .stream()
+ .filter(k -> k.startsWith(prefix))
+ .collect(
+ Properties::new,
+ (p, k) -> p.setProperty(k.substring(prefix.length()),
properties.getProperty(k)),
+ Hashtable::putAll);
+ return fromProperties(instantiationProperties);
+ }
+
+ /**
+ * <p>For more information please see
+ * {@link PipelineOptionsFactory#fromProperties(String, Properties)}.</p>
+ *
+ * <p>The intent of this method is to let you extract from system properties
+ * key/values to build {@link PipelineOptions} very quickly without
duplicating
+ * the mapping each time you need to do it yourself.</p>
+ *
+ * <p>If you don't develop a library or have a special case,
+ * it is recommended to use {@link
PipelineOptionsFactory#fromSystemProperties()}
+ * as a default factory.</p>
+ *
+ * @param prefix the not null prefix filter applied on system properties
keys.
+ * @return a pipeline options instance based on system properties key/values
+ * and filtered with a custom prefix.
+ */
+ @Experimental
+ public static PipelineOptions fromSystemProperties(final String prefix) {
+ checkArgument(prefix != null, "prefix should not be null.");
+ return fromProperties(prefix, System.getProperties());
+ }
+
+ /**
+ * <p>For more information please see
+ * {@link PipelineOptionsFactory#fromSystemProperties(String)}.</p>
+ *
+ * <p>This method will filter system properties based on the conventional
prefix
+ * <pre>beam.</pre> to create a {@link PipelineOptions} instance from the
filtered
+ * key/values.
+ *
+ * @return a pipeline options instance based on system properties
+ * filtered based on <code>beam.</code> prefix.
+ */
+ @Experimental
+ public static PipelineOptions fromSystemProperties() {
+ return fromSystemProperties("beam.");
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index be92f2ac4d0..7560d7e00d7 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.options;
+import static java.util.Collections.singletonList;
import static java.util.Locale.ROOT;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -55,6 +56,7 @@
import java.io.PrintStream;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -1797,6 +1799,82 @@ public void serialize(JacksonIncompatible
jacksonIncompatible, JsonGenerator jso
}
}
+ @Test
+ public void testPipelineOptionsFactoryFromProperties() {
+ assertEquals("testAppName", PipelineOptionsFactory.fromProperties(new
Properties() {{
+ put("appName", "testAppName");
+ }}).as(ApplicationNameOptions.class).getAppName());
+ }
+
+ @Test
+ public void testPipelineOptionsFactoryFromSystemProperties() {
+ final String prefix = getClass().getName() +
".testPipelineOptionsFactoryFromSystemProperties.";
+ System.setProperty(prefix + "appName", "testAppName");
+ assertEquals("testAppName",
+ PipelineOptionsFactory.fromSystemProperties(prefix)
+ .as(ApplicationNameOptions.class).getAppName());
+ System.clearProperty(prefix + "appName");
+ }
+
+ @Test
+ public void testPipelineOptionsFactoryFromPropertiesWithPrefix() {
+
PipelineOptionsFactory.register(PipelineOptionsFactoryFromPropertiesTestOptions.class);
+ { // expected case, set and not set
+ final PipelineOptions options =
+ PipelineOptionsFactory.fromProperties("prefix.", new Properties() {{
+ put("prefix.appName", "testAppName");
+ put("prefix.option", "test");
+ }});
+ final ApplicationNameOptions opts =
options.as(ApplicationNameOptions.class);
+ assertEquals("testAppName", opts.getAppName());
+ assertNull(opts.getTempLocation());
+ assertEquals(
+ "test",
+
options.as(PipelineOptionsFactoryFromPropertiesTestOptions.class).getOption());
+ }
+ { // non string value - nobody should care about that case except us as a
testing coverage
+ final PipelineOptionsFactoryFromPropertiesTestOptions opts =
+ PipelineOptionsFactory.fromProperties("prefix.", new
Properties() {{
+ put("prefix.appName", singletonList("test"));
+ }}).as(PipelineOptionsFactoryFromPropertiesTestOptions.class);
+ assertNull(opts.getOption());
+ }
+ { // non string key - nobody should care about that case except us as a
testing coverage
+ final PipelineOptionsFactoryFromPropertiesTestOptions opts =
+ PipelineOptionsFactory.fromProperties("prefix.", new
Properties() {{
+ put(singletonList("prefix.appName"), "test");
+ }}).as(PipelineOptionsFactoryFromPropertiesTestOptions.class);
+ assertNull(opts.getOption());
+ }
+ }
+
+ /** Used to test properties typing handling. */
+ public interface PipelineOptionsFactoryFromPropertiesTestOptions extends
PipelineOptions {
+ @Description("A test option.")
+ String getOption();
+ void setOption(String b);
+ }
+
+ @Test
+ public void testPipelineOptionsFactoryFromPropertiesWithLongPrefix() {
+ assertEquals(
+ "testAppName",
+ PipelineOptionsFactory.fromProperties("org.apache.beam.", new
Properties() {{
+ put("org.apache.beam.appName", "testAppName");
+ }}).as(ApplicationNameOptions.class).getAppName());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPipelineOptionsFactoryFromNullProperties() {
+ PipelineOptionsFactory.fromProperties(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPipelineOptionsFactoryFromPropertiesAndNullPrefix() {
+ PipelineOptionsFactory.fromProperties(null, new Properties() {{
+ put("appName", "testAppName");
+ }}).as(ApplicationNameOptions.class).getAppName();
+ }
/** Used to test that the thread context class loader is used when creating
proxies. */
public interface ClassLoaderTestOptions extends PipelineOptions {
@Default.Boolean(true)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 140691)
Time Spent: 11.5h (was: 11h 20m)
> Support system properties source for pipeline options
> -----------------------------------------------------
>
> Key: BEAM-3702
> URL: https://issues.apache.org/jira/browse/BEAM-3702
> Project: Beam
> Issue Type: Task
> Components: sdk-java-core
> Reporter: Romain Manni-Bucau
> Assignee: Romain Manni-Bucau
> Priority: Major
> Time Spent: 11.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)