APEXCORE-105 Introduce module meta Inject properties through xml file on modules.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b0360d45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b0360d45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b0360d45 Branch: refs/heads/master Commit: b0360d45a361e462124db9c3000977987ca830e6 Parents: 2f1e1df Author: Tushar R. Gosavi <[email protected]> Authored: Tue Oct 6 13:48:53 2015 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Tue Dec 22 01:16:51 2015 +0530 ---------------------------------------------------------------------- api/src/main/java/com/datatorrent/api/DAG.java | 2 + .../stram/codec/LogicalPlanSerializer.java | 2 +- .../stram/plan/logical/LogicalPlan.java | 120 ++++++++++- .../plan/logical/LogicalPlanConfiguration.java | 99 +++++++-- .../stram/webapp/StramWebServices.java | 2 +- .../logical/module/TestModuleProperties.java | 58 +++++ .../stram/plan/logical/module/TestModules.java | 216 +++++++++++++++++++ 7 files changed, 472 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/api/src/main/java/com/datatorrent/api/DAG.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java index abe2954..1dce402 100644 --- a/api/src/main/java/com/datatorrent/api/DAG.java +++ b/api/src/main/java/com/datatorrent/api/DAG.java @@ -164,6 +164,8 @@ public interface DAG extends DAGContext, Serializable { String getName(); + Module getModule(); + InputPortMeta getMeta(Operator.InputPort<?> port); OutputPortMeta getMeta(Operator.OutputPort<?> port); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java index 9e5ac04..90dd2b5 100644 --- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java +++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java @@ -212,7 +212,7 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan> String operatorKey = LogicalPlanConfiguration.OPERATOR_PREFIX + operatorMeta.getName(); Operator operator = operatorMeta.getOperator(); props.setProperty(operatorKey + "." + LogicalPlanConfiguration.OPERATOR_CLASSNAME, operator.getClass().getName()); - BeanMap operatorProperties = LogicalPlanConfiguration.getOperatorProperties(operator); + BeanMap operatorProperties = LogicalPlanConfiguration.getObjectProperties(operator); @SuppressWarnings("rawtypes") Iterator entryIterator = operatorProperties.entryIterator(); while (entryIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index cca45d8..53e81bc 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.datatorrent.api.*; @@ -48,7 +49,6 @@ import com.datatorrent.api.Operator.Unifier; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; - import com.datatorrent.common.experimental.AppData; import com.datatorrent.common.metric.MetricsAggregator; import com.datatorrent.common.metric.SingleMetricAggregator; @@ -1067,7 +1067,7 @@ public class LogicalPlan implements Serializable, DAG public <T extends Operator> T addOperator(String name, T operator) { if (operators.containsKey(name)) { - if (operators.get(name) == (Object)operator) { + if (operators.get(name).operator == operator) { return operator; } throw new IllegalArgumentException("duplicate operator id: " + operators.get(name)); @@ -1079,16 +1079,109 @@ public class LogicalPlan implements Serializable, DAG return operator; } - @Override - public <T extends Module> T addModule(String name, Class<T> moduleClass) + public final class ModuleMeta implements DAG.ModuleMeta, Serializable { - throw new UnsupportedOperationException("Modules are not supported"); + private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>(); + private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>(); + private final Attribute.AttributeMap attributes; + @SuppressWarnings("unused") + private final int id; + @NotNull + private final String name; + private transient Integer nindex; // for cycle detection + private transient Integer lowlink; // for cycle detection + private transient Module module; + + public ModuleMeta(String name, Module module) + { + this(name, module, new DefaultAttributeMap()); + } + + public ModuleMeta(String name, Module module, DefaultAttributeMap attributeMap) + { + LOG.debug("Initializing {} as {}", name, module.getClass().getName()); + this.name = name; + this.module = module; + this.id = logicalOperatorSequencer.decrementAndGet(); + this.attributes = attributeMap; + } + + @Override + public String getName() + { + return name; + } + + @Override + public Module getModule() + { + return module; + } + + @Override + public DAG.InputPortMeta getMeta(InputPort<?> port) + { + return null; + } + + @Override + public DAG.OutputPortMeta getMeta(OutputPort<?> port) + { + return null; + } + + @Override + public Attribute.AttributeMap getAttributes() + { + return null; + } + + @Override + public <T> T getValue(Attribute<T> key) + { + return null; + } + + @Override + public void setCounters(Object counters) + { + + } + + @Override + public void sendMetrics(Collection<String> metricNames) + { + + } } + public transient Map<String, ModuleMeta> modules = Maps.newHashMap(); + @Override public <T extends Module> T addModule(String name, T module) { - throw new UnsupportedOperationException("Modules are not supported"); + if (modules.containsKey(name)) { + if (modules.get(name).module == module) { + return module; + } + throw new IllegalArgumentException("duplicate module is: " + modules.get(name)); + } + ModuleMeta meta = new ModuleMeta(name, module); + modules.put(name, meta); + return module; + } + + @Override + public <T extends Module> T addModule(String name, Class<T> clazz) + { + T instance; + try { + instance = clazz.newInstance(); + } catch (Exception ex) { + throw new IllegalArgumentException(ex); + } + addModule(name, instance); + return instance; } public void removeOperator(Operator operator) @@ -1231,6 +1324,10 @@ public class LogicalPlan implements Serializable, DAG return Collections.unmodifiableCollection(this.operators.values()); } + public Collection<ModuleMeta> getAllModules() { + return Collections.unmodifiableCollection(this.modules.values()); + } + public Collection<StreamMeta> getAllStreams() { return Collections.unmodifiableCollection(this.streams.values()); @@ -1242,10 +1339,9 @@ public class LogicalPlan implements Serializable, DAG return this.operators.get(operatorName); } - @Override public ModuleMeta getModuleMeta(String moduleName) { - throw new UnsupportedOperationException("Modules are not supported"); + return null; } @Override @@ -1260,10 +1356,14 @@ public class LogicalPlan implements Serializable, DAG throw new IllegalArgumentException("Operator not associated with the DAG: " + operator); } - @Override public ModuleMeta getMeta(Module module) { - throw new UnsupportedOperationException("Modules are not supported"); + for (ModuleMeta m : getAllModules()) { + if (m.module == module) { + return m; + } + } + throw new IllegalArgumentException("Module not associated with the DAG: " + module); } public int getMaxContainerCount() http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 60bbdbe..9bbe85c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -18,7 +18,6 @@ */ package com.datatorrent.stram.plan.logical; - import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -32,7 +31,6 @@ import java.lang.reflect.Type; import java.util.*; import java.util.Map.Entry; - import javax.validation.ValidationException; import com.google.common.annotations.VisibleForTesting; @@ -47,6 +45,7 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.commons.beanutils.BeanMap; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.collections.CollectionUtils; @@ -61,10 +60,10 @@ import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.annotation.ApplicationAnnotation; - import com.datatorrent.stram.StramUtils; import com.datatorrent.stram.client.StramClientUtils; import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; +import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta; import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta; @@ -2114,6 +2113,10 @@ public class LogicalPlanConfiguration { if (dag.getAttributes().get(Context.DAGContext.APPLICATION_NAME) == null) { dag.setAttribute(Context.DAGContext.APPLICATION_NAME, appName); } + + // Expand the modules within the dag recursively + setModuleProperties(dag, appName); + // inject external operator configuration setOperatorConfiguration(dag, appConfs, appName); setStreamConfiguration(dag, appConfs, appName); @@ -2138,7 +2141,7 @@ public class LogicalPlanConfiguration { public Map<String, String> getProperties(OperatorMeta ow, String appName) { List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION); List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR); - return getProperties(ow, opConfs, appName); + return getProperties(getPropertyArgs(ow), opConfs, appName); } private Map<String,String> getApplicationProperties(List<AppConf> appConfs){ @@ -2153,17 +2156,17 @@ public class LogicalPlanConfiguration { /** * Get the configuration opProps for the given operator. * These can be operator specific settings or settings from matching templates. - * @param ow + * @param pa * @param opConfs * @param appName */ - private Map<String, String> getProperties(OperatorMeta ow, List<OperatorConf> opConfs, String appName) + private Map<String, String> getProperties(PropertyArgs pa, List<OperatorConf> opConfs, String appName) { Map<String, String> opProps = Maps.newHashMap(); Map<String, TemplateConf> templates = stramConf.getChildren(StramElement.TEMPLATE); // list of all templates that match operator, ordered by priority if (!templates.isEmpty()) { - TreeMap<Integer, TemplateConf> matchingTemplates = getMatchingTemplates(ow, appName, templates); + TreeMap<Integer, TemplateConf> matchingTemplates = getMatchingTemplates(pa, appName, templates); if (matchingTemplates != null && !matchingTemplates.isEmpty()) { // combined map of prioritized template settings for (TemplateConf t : matchingTemplates.descendingMap().values()) { @@ -2197,23 +2200,46 @@ public class LogicalPlanConfiguration { return refTemplates; } + private static class PropertyArgs + { + String name; + String className; + + public PropertyArgs(String name, String className) + { + this.name = name; + this.className = className; + } + } + + private PropertyArgs getPropertyArgs(OperatorMeta om) + { + return new PropertyArgs(om.getName(), om.getOperator().getClass().getName()); + } + + private PropertyArgs getPropertyArgs(ModuleMeta mm) + { + return new PropertyArgs(mm.getName(), mm.getModule().getClass().getName()); + } + /** * Produce the collections of templates that apply for the given id. - * @param ow + * @param pa * @param appName * @param templates * @return TreeMap<Integer, TemplateConf> */ - private TreeMap<Integer, TemplateConf> getMatchingTemplates(OperatorMeta ow, String appName, Map<String, TemplateConf> templates) { + private TreeMap<Integer, TemplateConf> getMatchingTemplates(PropertyArgs pa, String appName, Map<String, TemplateConf> templates) + { TreeMap<Integer, TemplateConf> tm = Maps.newTreeMap(); for (TemplateConf t : templates.values()) { - if ((t.idRegExp != null && ow.getName().matches(t.idRegExp))) { + if ((t.idRegExp != null && pa.name.matches(t.idRegExp))) { tm.put(1, t); } else if (appName != null && t.appNameRegExp != null && appName.matches(t.appNameRegExp)) { tm.put(2, t); } else if (t.classNameRegExp != null - && ow.getOperator().getClass().getName().matches(t.classNameRegExp)) { + && pa.className.matches(t.classNameRegExp)) { tm.put(3, t); } } @@ -2238,6 +2264,26 @@ public class LogicalPlanConfiguration { } } + /** + * Generic helper function to inject properties on the object. + * + * @param obj + * @param properties + * @param <T> + * @return + */ + public static <T> T setObjectProperties(T obj, Map<String, String> properties) + { + try { + BeanUtils.populate(obj, properties); + return obj; + } catch (IllegalAccessException e) { + throw new IllegalArgumentException("Error setting operator properties", e); + } catch (InvocationTargetException e) { + throw new IllegalArgumentException("Error setting operator properties", e); + } + } + public static StreamingApplication setApplicationProperties(StreamingApplication application, Map<String, String> properties) { try { @@ -2249,9 +2295,9 @@ public class LogicalPlanConfiguration { } } - public static BeanMap getOperatorProperties(Operator operator) + public static BeanMap getObjectProperties(Object obj) { - return new BeanMap(operator); + return new BeanMap(obj); } /** @@ -2266,12 +2312,26 @@ public class LogicalPlanConfiguration { List<AppConf> appConfs = stramConf.getMatchingChildConf(applicationName, StramElement.APPLICATION); for (OperatorMeta ow : dag.getAllOperators()) { List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR); - Map<String, String> opProps = getProperties(ow, opConfs, applicationName); + Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, applicationName); setOperatorProperties(ow.getOperator(), opProps); } } /** + * Set any properties from configuration on the modules in the DAG. This + * method may throw unchecked exception if the configuration contains + * properties that are invalid for a module. + * + * @param dag + * @param applicationName + */ + public void setModuleProperties(LogicalPlan dag, String applicationName) + { + List<AppConf> appConfs = stramConf.getMatchingChildConf(applicationName, StramElement.APPLICATION); + setModuleConfiguration(dag, appConfs, applicationName); + } + + /** * Set the application configuration. * @param dag * @param appName @@ -2298,7 +2358,7 @@ public class LogicalPlanConfiguration { // Set the operator attributes setAttributes(opConfs, ow.getAttributes()); // Set the operator opProps - Map<String, String> opProps = getProperties(ow, opConfs, appName); + Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, appName); setOperatorProperties(ow.getOperator(), opProps); // Set the port attributes @@ -2327,6 +2387,15 @@ public class LogicalPlanConfiguration { } } + private void setModuleConfiguration(final LogicalPlan dag, List<AppConf> appConfs, String appName) + { + for (final ModuleMeta mw : dag.getAllModules()) { + List<OperatorConf> opConfs = getMatchingChildConf(appConfs, mw.getName(), StramElement.OPERATOR); + Map<String, String> opProps = getProperties(getPropertyArgs(mw), opConfs, appName); + setObjectProperties(mw.getModule(), opProps); + } + } + private void setStreamConfiguration(LogicalPlan dag, List<AppConf> appConfs, String appAlias) { for (StreamMeta sm : dag.getAllStreams()) { List<StreamConf> smConfs = getMatchingChildConf(appConfs, sm.getName(), StramElement.STREAM); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java index de085cd..6fdba00 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java @@ -715,7 +715,7 @@ public class StramWebServices throw new NotFoundException(); } - BeanMap operatorProperties = LogicalPlanConfiguration.getOperatorProperties(logicalOperator.getOperator()); + BeanMap operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator()); Map<String, Object> m = new HashMap<String, Object>(); @SuppressWarnings("rawtypes") Iterator entryIterator = operatorProperties.entryIterator(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java new file mode 100644 index 0000000..7951e26 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleProperties.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical.module; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; + +public class TestModuleProperties +{ + @Test + public void testModuleProperties() + { + Configuration conf = new Configuration(false); + conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue"); + conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c"); + conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val"); + conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal"); + conf.set(StreamingApplication.DT_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal"); + + LogicalPlan dag = new LogicalPlan(); + TestModules.GenericModule o1 = dag.addModule("o1", new TestModules.GenericModule()); + TestModules.ValidationTestModule o2 = dag.addModule("o2", new TestModules.ValidationTestModule()); + + LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf); + + pb.setModuleProperties(dag, "testSetOperatorProperties"); + Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty()); + Assert.assertArrayEquals("o2.stringArrayField", new String[]{"a", "b", "c"}, o2.getStringArrayField()); + + Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1")); + Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot")); + Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot")); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0360d45/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java new file mode 100644 index 0000000..8fad613 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModules.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.plan.logical.module; + +import java.util.Map; + +import javax.validation.Valid; +import javax.validation.constraints.AssertTrue; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Module; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.stram.engine.GenericOperatorProperty; + +public class TestModules +{ + + public static class GenericModule implements Module + { + private static final Logger LOG = LoggerFactory.getLogger(TestModules.class); + + public volatile Object inport1Tuple = null; + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>(); + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<Object>(); + + private String emitFormat; + + public boolean booleanProperty; + + private String myStringProperty; + + private transient GenericOperatorProperty genericOperatorProperty = new GenericOperatorProperty("test"); + + public String getMyStringProperty() + { + return myStringProperty; + } + + public void setMyStringProperty(String myStringProperty) + { + this.myStringProperty = myStringProperty; + } + + public boolean isBooleanProperty() + { + return booleanProperty; + } + + public void setBooleanProperty(boolean booleanProperty) + { + this.booleanProperty = booleanProperty; + } + + public String propertySetterOnly; + + /** + * setter w/o getter defined + * + * @param v + */ + public void setStringPropertySetterOnly(String v) + { + this.propertySetterOnly = v; + } + + public String getEmitFormat() + { + return emitFormat; + } + + public void setEmitFormat(String emitFormat) + { + this.emitFormat = emitFormat; + } + + public GenericOperatorProperty getGenericOperatorProperty() + { + return genericOperatorProperty; + } + + public void setGenericOperatorProperty(GenericOperatorProperty genericOperatorProperty) + { + this.genericOperatorProperty = genericOperatorProperty; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + LOG.debug("populateDAG of module called"); + } + } + + public static class ValidationTestModule implements Module + { + @NotNull + @Pattern(regexp = ".*malhar.*", message = "Value has to contain 'malhar'!") + private String stringField1; + + @Min(2) + private int intField1; + + @AssertTrue(message = "stringField1 should end with intField1") + private boolean isValidConfiguration() + { + return stringField1.endsWith(String.valueOf(intField1)); + } + + private String getterProperty2 = ""; + + @NotNull + public String getProperty2() + { + return getterProperty2; + } + + public void setProperty2(String s) + { + // annotations need to be on the getter + getterProperty2 = s; + } + + private String[] stringArrayField; + + public String[] getStringArrayField() + { + return stringArrayField; + } + + public void setStringArrayField(String[] stringArrayField) + { + this.stringArrayField = stringArrayField; + } + + public class Nested + { + @NotNull + private String property = ""; + + public String getProperty() + { + return property; + } + + public void setProperty(String property) + { + this.property = property; + } + + } + + @Valid + private final Nested nestedBean = new Nested(); + + private String stringProperty2; + + public String getStringProperty2() + { + return stringProperty2; + } + + public void setStringProperty2(String stringProperty2) + { + this.stringProperty2 = stringProperty2; + } + + private Map<String, String> mapProperty = Maps.newHashMap(); + + public Map<String, String> getMapProperty() + { + return mapProperty; + } + + public void setMapProperty(Map<String, String> mapProperty) + { + this.mapProperty = mapProperty; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + + } + } + +}
