Repository: apex-core Updated Branches: refs/heads/master a9e4e053b -> b74d68967
APEXCORE-522 - Promote singleton usage pattern for String2String, Long2String and other StringCodecs Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2e54dd08 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2e54dd08 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2e54dd08 Branch: refs/heads/master Commit: 2e54dd0813ebd8f926fede49aef3ae664c645afd Parents: 93f790a Author: Vlad Rozov <[email protected]> Authored: Wed Sep 7 09:12:54 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Wed Dec 7 15:43:43 2016 -0800 ---------------------------------------------------------------------- .../java/com/datatorrent/api/Attribute.java | 18 +- .../main/java/com/datatorrent/api/Context.java | 57 +++--- .../java/com/datatorrent/api/StringCodec.java | 178 ++++++++++++++++++- .../com/datatorrent/api/Object2StringTest.java | 22 +-- .../partitioner/StatelessPartitionerTest.java | 3 +- .../stram/plan/logical/LogicalPlan.java | 8 +- 6 files changed, 220 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/api/src/main/java/com/datatorrent/api/Attribute.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java index 8dede2f..2efc84f 100644 --- a/api/src/main/java/com/datatorrent/api/Attribute.java +++ b/api/src/main/java/com/datatorrent/api/Attribute.java @@ -29,12 +29,6 @@ import java.util.Set; import com.google.common.base.Throwables; -import com.datatorrent.api.StringCodec.Boolean2String; -import com.datatorrent.api.StringCodec.Enum2String; -import com.datatorrent.api.StringCodec.Integer2String; -import com.datatorrent.api.StringCodec.Long2String; -import com.datatorrent.api.StringCodec.String2String; - /** * Attribute represents the attribute which can be set on various components in the system. * @@ -295,17 +289,7 @@ public class Attribute<T> implements Serializable StringCodec<?> codec = null; if (attribute.defaultValue != null) { Class<?> klass = attribute.defaultValue.getClass(); - if (klass == String.class) { - codec = new String2String(); - } else if (klass == Integer.class) { - codec = new Integer2String(); - } else if (klass == Long.class) { - codec = new Long2String(); - } else if (klass == Boolean.class) { - codec = new Boolean2String(); - } else if (Enum.class.isAssignableFrom(klass)) { - codec = new Enum2String(klass); - } + codec = StringCodec.Factory.getInstance(klass); } if (codec != null) { Field codecField = Attribute.class.getDeclaredField("codec"); http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 187bf08..3d3cffe 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -173,13 +173,13 @@ public interface Context * that can be received on the port. If it is unspecified the engine may use * a generic codec. */ - Attribute<StreamCodec<?>> STREAM_CODEC = new Attribute<StreamCodec<?>>(new Object2String<StreamCodec<?>>()); + Attribute<StreamCodec<?>> STREAM_CODEC = new Attribute<>(Object2String.<StreamCodec<?>>getInstance()); /** * Provides the tuple class which the port receives or emits. While this attribute is null by default, * whether it is needed or not is controlled through the port annotation. */ - Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>()); + Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(Class2String.getInstance()); @SuppressWarnings("FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class); @@ -207,12 +207,12 @@ public interface Context * is equivalent to infinity; The operator hence will be attempted to be recovered indefinitely unless this value * is set to anything else. */ - Attribute<Integer> RECOVERY_ATTEMPTS = new Attribute<Integer>(new Integer2String()); + Attribute<Integer> RECOVERY_ATTEMPTS = new Attribute<>(Integer2String.getInstance()); /** * Specify a listener to process and optionally react to operator status updates. * The handler will be called for each physical operator as statistics are updated during heartbeat processing. */ - Attribute<Collection<StatsListener>> STATS_LISTENERS = new Attribute<Collection<StatsListener>>(new Collection2String<StatsListener>(",", new Object2String<StatsListener>(":"))); + Attribute<Collection<StatsListener>> STATS_LISTENERS = new Attribute<>(Collection2String.getInstance(",", Object2String.<StatsListener>getInstance(":"))); /** * Conveys whether the Operator is stateful or stateless. If the operator is stateless, no checkpointing is required * by the engine. The attribute is ignored when the operator was already declared stateless through the @@ -231,7 +231,7 @@ public interface Context /** * The options to be pass to JVM when launching the operator. Options such as java maximum heap size can be specified here. */ - Attribute<String> JVM_OPTIONS = new Attribute<String>(new String2String()); + Attribute<String> JVM_OPTIONS = new Attribute<>(String2String.getInstance()); /** * Attribute of the operator that tells the platform how many streaming windows make 1 application window. */ @@ -241,7 +241,7 @@ public interface Context * slided by duration determined using value of this attribute. Default value is null which is equivalent to that of {@link #APPLICATION_WINDOW_COUNT}. * The value should range between (0 - {@link #APPLICATION_WINDOW_COUNT}) */ - Attribute<Integer> SLIDE_BY_WINDOW_COUNT = new Attribute<Integer>(new Integer2String()); + Attribute<Integer> SLIDE_BY_WINDOW_COUNT = new Attribute<>(Integer2String.getInstance()); /** * Attribute of the operator that hints at the optimal checkpoint boundary. @@ -257,15 +257,15 @@ public interface Context * For example, the user may wish to specify a locality constraint for an input operator relative to its data source. * The attribute can then be set to the host name that is specified in the operator specific connect string property. */ - Attribute<String> LOCALITY_HOST = new Attribute<String>(new String2String()); + Attribute<String> LOCALITY_HOST = new Attribute<>(String2String.getInstance()); /** * Name of rack to directly control locality of an operator. Complementary to stream locality (RACK_LOCAL affinity). */ - Attribute<String> LOCALITY_RACK = new Attribute<String>(new String2String()); + Attribute<String> LOCALITY_RACK = new Attribute<>(String2String.getInstance()); /** * The agent which can be used to checkpoint the windows. */ - Attribute<StorageAgent> STORAGE_AGENT = new Attribute<StorageAgent>(new Object2String<StorageAgent>()); + Attribute<StorageAgent> STORAGE_AGENT = new Attribute<>(Object2String.<StorageAgent>getInstance()); /** * The payload processing mode for this operator - at most once, exactly once, or default at least once. * If the processing mode for an operator is specified as AT_MOST_ONCE and no processing mode is specified for the downstream @@ -293,27 +293,26 @@ public interface Context * If the attribute is not set and the operator implements Partitioner interface, then the instance of the operator * is used otherwise default default partitioning is used. */ - Attribute<Partitioner<? extends Operator>> PARTITIONER = new Attribute<Partitioner<? extends Operator>>(new Object2String<Partitioner<? extends Operator>>()); + Attribute<Partitioner<? extends Operator>> PARTITIONER = new Attribute<>(Object2String.<Partitioner<? extends Operator>>getInstance()); /** * Aggregates physical counters to a logical counter. * @deprecated use {@link #METRICS_AGGREGATOR} */ @Deprecated - Attribute<CountersAggregator> COUNTERS_AGGREGATOR = new Attribute<CountersAggregator>(new Object2String<CountersAggregator>()); + Attribute<CountersAggregator> COUNTERS_AGGREGATOR = new Attribute<>(Object2String.<CountersAggregator>getInstance()); /** * Aggregates metrics of physical instances of an operator. This handler is called with the metrics data of a * particular window from all the physical instances so that it can be aggregated into a logical view. */ - Attribute<AutoMetric.Aggregator> METRICS_AGGREGATOR = new Attribute<AutoMetric.Aggregator>(new Object2String<AutoMetric.Aggregator>()); + Attribute<AutoMetric.Aggregator> METRICS_AGGREGATOR = new Attribute<>(Object2String.<AutoMetric.Aggregator>getInstance()); /** * Provides dimension aggregations and time buckets information for logical metrics. The information provided * by this construct is conveyed to tracker application and influences the aggregations done on it by the tracker. */ - Attribute<AutoMetric.DimensionsScheme> METRICS_DIMENSIONS_SCHEME = new Attribute<AutoMetric.DimensionsScheme>(new - Object2String<AutoMetric.DimensionsScheme>()); + Attribute<AutoMetric.DimensionsScheme> METRICS_DIMENSIONS_SCHEME = new Attribute<>(Object2String.<AutoMetric.DimensionsScheme>getInstance()); /** * Return the operator runtime id. @@ -353,7 +352,7 @@ public interface Context /** * URL to the application's documentation. */ - Attribute<String> APPLICATION_DOC_LINK = new Attribute<String>(new String2String()); + Attribute<String> APPLICATION_DOC_LINK = new Attribute<>(String2String.getInstance()); /** * URL to the application's app data, if any. If not set, an empty string is the default. @@ -364,12 +363,12 @@ public interface Context * <code>"http://mynetwork.net/my/appdata/dashboard?appId=application_1355713111917_0002"</code>. * </p> */ - Attribute<String> APPLICATION_DATA_LINK = new Attribute<String>(new String2String()); + Attribute<String> APPLICATION_DATA_LINK = new Attribute<>(String2String.getInstance()); /** * Transport to push the stats and the metrics. * If using the built-in transport, please use an AutoMetricBuiltInTransport object */ - Attribute<AutoMetric.Transport> METRICS_TRANSPORT = new Attribute<>(new Object2String<AutoMetric.Transport>()); + Attribute<AutoMetric.Transport> METRICS_TRANSPORT = new Attribute<>(Object2String.<AutoMetric.Transport>getInstance()); /** * Application instance identifier. An application with the same name can run in multiple instances, each with a * unique identifier. The identifier is set by the client that submits the application and can be used in operators @@ -379,12 +378,12 @@ public interface Context * <code>application_1355713111917_0002</code>). Note that only the full id string uniquely identifies an application, * the integer offset will reset on RM restart. */ - Attribute<String> APPLICATION_ID = new Attribute<String>(new String2String()); + Attribute<String> APPLICATION_ID = new Attribute<>(String2String.getInstance()); /** * Application package source. If the application is launched using an app package, this attribute contains the * information of the app package. It is in the format of {user}|{appPackageName}|{appPackageVersion} */ - Attribute<String> APP_PACKAGE_SOURCE = new Attribute<String>(new String2String()); + Attribute<String> APP_PACKAGE_SOURCE = new Attribute<>(String2String.getInstance()); /** * Dump extra debug information in launcher, master and containers. */ @@ -392,7 +391,7 @@ public interface Context /** * The options to be pass to JVM when launching the containers. Options such as java maximum heap size can be specified here. */ - Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<String>(new String2String()); + Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<>(String2String.getInstance()); /** * The amount of memory to be requested for the application master. Not used in local mode. * Default value is 1GB. @@ -414,7 +413,7 @@ public interface Context /** * The path to store application dependencies, recording and other generated files for application master and containers. */ - Attribute<String> APPLICATION_PATH = new Attribute<String>(new String2String()); + Attribute<String> APPLICATION_PATH = new Attribute<>(String2String.getInstance()); /** * The size limit for a file where tuple recordings are stored. When tuples are being recorded they are stored * in files. When a file size reaches this limit a new file is created and tuples start getting stored in the new file. Default value is 128k. @@ -429,7 +428,7 @@ public interface Context /** * Address to which the application side connects to DT Gateway, in the form of host:port. This will override "dt.gateway.listenAddress" in the configuration. */ - Attribute<String> GATEWAY_CONNECT_ADDRESS = new Attribute<String>(new String2String()); + Attribute<String> GATEWAY_CONNECT_ADDRESS = new Attribute<>(String2String.getInstance()); /** * Whether or not gateway is expecting SSL connection. */ @@ -437,11 +436,11 @@ public interface Context /** * The username for logging in to the gateway, if authentication is enabled. */ - Attribute<String> GATEWAY_USER_NAME = new Attribute<String>(new String2String()); + Attribute<String> GATEWAY_USER_NAME = new Attribute<>(String2String.getInstance()); /** * The password for logging in to the gateway, if authentication is enabled. */ - Attribute<String> GATEWAY_PASSWORD = new Attribute<String>(new String2String()); + Attribute<String> GATEWAY_PASSWORD = new Attribute<>(String2String.getInstance()); /** * The timeout when connecting to the pubsub service in gateway */ @@ -494,18 +493,18 @@ public interface Context /** * The agent which can be used to find the jvm options for the container. */ - Attribute<ContainerOptConfigurator> CONTAINER_OPTS_CONFIGURATOR = new Attribute<ContainerOptConfigurator>(new Object2String<ContainerOptConfigurator>()); + Attribute<ContainerOptConfigurator> CONTAINER_OPTS_CONFIGURATOR = new Attribute<>(Object2String.<ContainerOptConfigurator>getInstance()); /** * The policy for enabling stram web services authentication.<br/> * See {@link StramHTTPAuthentication} for the different options.<br/> * Default value is StramHTTPAuthentication.FOLLOW_HADOOP_AUTH */ - Attribute<StramHTTPAuthentication> STRAM_HTTP_AUTHENTICATION = new Attribute<>(StramHTTPAuthentication.FOLLOW_HADOOP_AUTH, new StringCodec.Enum2String<>(StramHTTPAuthentication.class)); + Attribute<StramHTTPAuthentication> STRAM_HTTP_AUTHENTICATION = new Attribute<>(StramHTTPAuthentication.FOLLOW_HADOOP_AUTH, StringCodec.Enum2String.getInstance(StramHTTPAuthentication.class)); /** * The string codec map for classes that are to be set or get through properties as strings. * Only supports string codecs that have a constructor with no arguments */ - Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>> STRING_CODECS = new Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>>(new Map2String<Class<?>, Class<? extends StringCodec<?>>>(",", "=", new Class2String<Object>(), new Class2String<StringCodec<?>>())); + Attribute<Map<Class<?>, Class<? extends StringCodec<?>>>> STRING_CODECS = new Attribute<>(Map2String.getInstance(",", "=", Class2String.getInstance(), Class2String.<StringCodec<?>>getInstance())); /** * The number of consecutive container failures that should lead to @@ -522,7 +521,7 @@ public interface Context /** * Affinity rules for specifying affinity and anti-affinity between logical operators */ - Attribute<AffinityRulesSet> AFFINITY_RULES_SET = new Attribute<AffinityRulesSet>(new JsonStringCodec<AffinityRulesSet>(AffinityRulesSet.class)); + Attribute<AffinityRulesSet> AFFINITY_RULES_SET = new Attribute<>(JsonStringCodec.getInstance(AffinityRulesSet.class)); /** * Comma separated list of jar file dependencies to be deployed with the application. @@ -530,7 +529,7 @@ public interface Context * that are made available through the distributed file system to application master * and child containers. */ - Attribute<String> LIBRARY_JARS = new Attribute<>(new StringCodec.String2String()); + Attribute<String> LIBRARY_JARS = new Attribute<>(String2String.getInstance()); @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(DAGContext.class); http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/api/src/main/java/com/datatorrent/api/StringCodec.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/StringCodec.java b/api/src/main/java/com/datatorrent/api/StringCodec.java index 2cd6a4f..d4a0a41 100644 --- a/api/src/main/java/com/datatorrent/api/StringCodec.java +++ b/api/src/main/java/com/datatorrent/api/StringCodec.java @@ -62,8 +62,44 @@ public interface StringCodec<T> */ String toString(T pojo); + class Factory + { + public static StringCodec<?> getInstance(Class<?> cls) + { + if (cls == String.class) { + return String2String.getInstance(); + } else if (cls == Integer.class) { + return Integer2String.getInstance(); + } else if (cls == Long.class) { + return Long2String.getInstance(); + } else if (cls == Boolean.class) { + return Boolean2String.getInstance(); + } else if (Enum.class.isAssignableFrom(cls)) { + return Enum2String.getInstance(cls); + } else { + return null; + } + } + } + class String2String implements StringCodec<String>, Serializable { + @SuppressWarnings("deprecation") + private static final String2String instance = new String2String(); + + public static StringCodec<String> getInstance() + { + return instance; + } + + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance()} + */ + @Deprecated + public String2String() + { + } + @Override public String fromString(String string) { @@ -81,6 +117,22 @@ public interface StringCodec<T> class Integer2String implements StringCodec<Integer>, Serializable { + @SuppressWarnings("deprecation") + private static final Integer2String instance = new Integer2String(); + + public static StringCodec<Integer> getInstance() + { + return instance; + } + + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance()} + */ + @Deprecated + public Integer2String() + { + } + @Override public Integer fromString(String string) { @@ -98,6 +150,22 @@ public interface StringCodec<T> class Long2String implements StringCodec<Long>, Serializable { + @SuppressWarnings("deprecation") + private static final Long2String instance = new Long2String(); + + public static StringCodec<Long> getInstance() + { + return instance; + } + + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance()} + */ + @Deprecated + public Long2String() + { + } + @Override public Long fromString(String string) { @@ -115,6 +183,22 @@ public interface StringCodec<T> class Boolean2String implements StringCodec<Boolean>, Serializable { + @SuppressWarnings("deprecation") + private static final Boolean2String instance = new Boolean2String(); + + public static StringCodec<Boolean> getInstance() + { + return instance; + } + + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance()} + */ + @Deprecated + public Boolean2String() + { + } + @Override public Boolean fromString(String string) { @@ -148,21 +232,52 @@ public interface StringCodec<T> */ class Object2String<T> implements StringCodec<T>, Serializable { + @SuppressWarnings("deprecation") + private static final Object2String instance = new Object2String(); + + public static <T> StringCodec<T> getInstance() + { + return instance; + } + + public static <T> StringCodec<T> getInstance(String separator) + { + return getInstance(separator, "="); + } + + @SuppressWarnings("deprecation") + public static <T> StringCodec<T> getInstance(String separator, String propertySeparator) + { + return new Object2String<>(separator, propertySeparator); + } + public final String separator; public final String propertySeparator; + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance()} + */ + @SuppressWarnings("deprecation") + @Deprecated public Object2String() { - separator = ":"; - propertySeparator = "="; + this(":", "="); } + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance(String)} + */ + @SuppressWarnings("deprecation") + @Deprecated public Object2String(String separator) { - this.separator = separator; - this.propertySeparator = "="; + this(separator, "="); } + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance(String, String)} + */ + @Deprecated public Object2String(String separator, String propertySeparator) { this.separator = separator; @@ -219,11 +334,21 @@ public interface StringCodec<T> class Map2String<K, V> implements StringCodec<Map<K, V>>, Serializable { + @SuppressWarnings("deprecation") + public static <K, V> StringCodec<Map<K, V>> getInstance(String separator, String equal, StringCodec<K> keyCodec, StringCodec<V> valueCodec) + { + return new Map2String<>(separator, equal, keyCodec, valueCodec); + } + private final StringCodec<K> keyCodec; private final StringCodec<V> valueCodec; private final String separator; private final String equal; + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance(String, String, StringCodec, StringCodec)} + */ + @Deprecated public Map2String(String separator, String equal, StringCodec<K> keyCodec, StringCodec<V> valueCodec) { this.equal = equal; @@ -276,9 +401,19 @@ public interface StringCodec<T> class Collection2String<T> implements StringCodec<Collection<T>>, Serializable { + @SuppressWarnings("deprecation") + public static <T> StringCodec<Collection<T>> getInstance(String separator, StringCodec<T> codec) + { + return new Collection2String<>(separator, codec); + } + private final String separator; private final StringCodec<T> codec; + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance(String, StringCodec)} + */ + @Deprecated public Collection2String(String separator, StringCodec<T> codec) { this.separator = separator; @@ -334,6 +469,16 @@ public interface StringCodec<T> { private final Class<T> clazz; + @SuppressWarnings("deprecation") + public static Enum2String getInstance(Class clazz) + { + return new Enum2String(clazz); + } + + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance(Class<T>)} + */ + @Deprecated public Enum2String(Class<T> clazz) { this.clazz = clazz; @@ -356,6 +501,21 @@ public interface StringCodec<T> class Class2String<T> implements StringCodec<Class<? extends T>>, Serializable { + @SuppressWarnings("deprecation") + private static final StringCodec instance = new Class2String<>(); + + public static <T> StringCodec<Class<? extends T>> getInstance() + { + return (StringCodec<Class<? extends T>>)instance; + } + + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance()} + */ + public Class2String() + { + } + @Override @SuppressWarnings({"BroadCatchBlock", "TooBroadCatch"}) public Class<? extends T> fromString(String string) @@ -381,8 +541,18 @@ public interface StringCodec<T> class JsonStringCodec<T> implements StringCodec<T>, Serializable { private static final long serialVersionUID = 2513932518264776006L; + + @SuppressWarnings("deprecation") + public static <T> StringCodec<T> getInstance(Class<T> clazz) + { + return new JsonStringCodec<>(clazz); + } + Class<?> clazz; + /** + * @deprecated As of release 3.5.0, replaced by {@link #getInstance(Class)} + */ public JsonStringCodec(Class<T> clazz) { this.clazz = clazz; http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/api/src/test/java/com/datatorrent/api/Object2StringTest.java ---------------------------------------------------------------------- diff --git a/api/src/test/java/com/datatorrent/api/Object2StringTest.java b/api/src/test/java/com/datatorrent/api/Object2StringTest.java index e42a462..98c584b 100644 --- a/api/src/test/java/com/datatorrent/api/Object2StringTest.java +++ b/api/src/test/java/com/datatorrent/api/Object2StringTest.java @@ -20,17 +20,21 @@ package com.datatorrent.api; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * This tests the Object2String codec */ public class Object2StringTest { + private StringCodec<TestBean> bean2String; + public static class TestBean { private int intVal; @@ -127,10 +131,16 @@ public class Object2StringTest } } + @Before + public void setup() + { + bean2String = StringCodec.Object2String.getInstance(); + assertTrue(bean2String instanceof StringCodec.Object2String); + } + @Test public void testBeanCodecWithoutConstructorWithoutProperty() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName(); TestBean obj = bean2String.fromString(bean); assertEquals("validating the bean",obj,new TestBean()); @@ -139,7 +149,6 @@ public class Object2StringTest @Test public void testBeanCodecWithConstructorSet() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + ":testVal"; TestBean obj = bean2String.fromString(bean); assertEquals("validating the bean", obj, new TestBean("testVal")); @@ -148,7 +157,6 @@ public class Object2StringTest @Test public void testBeanCodecWithConstructorPropertySet() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + ":testVal:intVal=10:stringVal=strVal"; TestBean obj = bean2String.fromString(bean); TestBean expectedBean = new TestBean("testVal"); @@ -160,7 +168,6 @@ public class Object2StringTest @Test public void testBeanCodecWithConstructorSetEmptyProperties() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + ":testVal:"; TestBean obj = bean2String.fromString(bean); assertEquals("validating the bean",obj,new TestBean("testVal")); @@ -169,7 +176,6 @@ public class Object2StringTest @Test public void testBeanCodecOnlyEmptyConstructor() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + ":"; TestBean obj = bean2String.fromString(bean); assertEquals("validating the bean",obj,new TestBean()); @@ -178,7 +184,6 @@ public class Object2StringTest @Test public void testBeanCodecOnlyConstructor() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + ": "; TestBean obj = bean2String.fromString(bean); assertEquals("validating the bean",obj,new TestBean(" ")); @@ -187,7 +192,6 @@ public class Object2StringTest @Test public void testBeanCodecEmptyConstructorEmptyProperty() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + "::"; TestBean obj = bean2String.fromString(bean); assertEquals("validating the bean",obj,new TestBean()); @@ -196,7 +200,6 @@ public class Object2StringTest @Test public void testBeanCodecWithProperty() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + "::intVal=1"; TestBean obj = bean2String.fromString(bean); TestBean expectedBean = new TestBean(""); @@ -207,7 +210,6 @@ public class Object2StringTest @Test public void testBeanCodecWithAllProperties() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + "::intVal=1:stringVal=testStr:longVal=10"; TestBean obj = bean2String.fromString(bean); TestBean expectedBean = new TestBean("testStr"); @@ -219,7 +221,6 @@ public class Object2StringTest @Test public void testBeanWithWrongClassName() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + "1::intVal=1"; try { bean2String.fromString(bean); @@ -237,7 +238,6 @@ public class Object2StringTest @Test public void testBeanFailure() { - StringCodec.Object2String<TestBean> bean2String = new StringCodec.Object2String<TestBean>(); String bean = TestBean.class.getName() + "::intVal=1:stringVal=hello:longVal=10"; TestBean obj = bean2String.fromString(bean); TestBean expectedBean = new TestBean("hello"); http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java index 687957c..e7c4887 100644 --- a/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java +++ b/common/src/test/java/com/datatorrent/common/partitioner/StatelessPartitionerTest.java @@ -33,6 +33,7 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Partitioner; import com.datatorrent.api.Partitioner.Partition; +import com.datatorrent.api.StringCodec; import com.datatorrent.api.StringCodec.Object2String; public class StatelessPartitionerTest @@ -127,7 +128,7 @@ public class StatelessPartitionerTest @Test public void objectPropertyTest() { - Object2String<StatelessPartitioner<DummyOperator>> propertyReader = new Object2String<StatelessPartitioner<DummyOperator>>(); + StringCodec<StatelessPartitioner<DummyOperator>> propertyReader = Object2String.getInstance(); StatelessPartitioner<DummyOperator> partitioner = propertyReader.fromString("com.datatorrent.common.partitioner.StatelessPartitioner:3"); Assert.assertEquals(3, partitioner.getPartitionCount()); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/2e54dd08/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index f1ccaef..1371ce8 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -157,8 +157,8 @@ public class LogicalPlan implements Serializable, DAG public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false); public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L); public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); - public static Attribute<String> PRINCIPAL = new Attribute<String>(null, new StringCodec.String2String()); - public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, new StringCodec.String2String()); + public static Attribute<String> PRINCIPAL = new Attribute<>(null, StringCodec.String2String.getInstance()); + public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, StringCodec.String2String.getInstance()); public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<>(0.7); /** * Comma separated list of archives to be deployed with the application. @@ -166,13 +166,13 @@ public class LogicalPlan implements Serializable, DAG * that are made available through the distributed file system to application master * and child containers. */ - public static Attribute<String> ARCHIVES = new Attribute<>(new StringCodec.String2String()); + public static Attribute<String> ARCHIVES = new Attribute<>(StringCodec.String2String.getInstance()); /** * Comma separated list of files to be deployed with the application. The launcher will include the files into the * final set of resources that are made available through the distributed file system to application master and child * containers. */ - public static Attribute<String> FILES = new Attribute<>(new StringCodec.String2String()); + public static Attribute<String> FILES = new Attribute<>(StringCodec.String2String.getInstance()); /** * The maximum number of containers (excluding the application master) that the application is allowed to request. * If the DAG plan requires less containers, remaining count won't be allocated from the resource manager.
