MLHR-1912 #resolve #comment fixed style violations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/b9aa203d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/b9aa203d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/b9aa203d Branch: refs/heads/devel-3 Commit: b9aa203d7c411181665362dc0cee7420c7d61291 Parents: 2fe4bec Author: Chandni Singh <[email protected]> Authored: Thu Nov 19 08:30:41 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Thu Nov 19 11:16:17 2015 -0800 ---------------------------------------------------------------------- .../datatorrent/lib/algo/UniqueValueCount.java | 13 +- .../lib/codec/KryoSerializableStreamCodec.java | 4 +- .../AbstractDBLookupCacheBackedOperator.java | 16 ++- .../datatorrent/lib/db/cache/CacheManager.java | 27 +++-- .../datatorrent/lib/db/cache/CacheStore.java | 5 +- .../lib/db/jdbc/AbstractJdbcInputOperator.java | 15 +-- ...stractJdbcTransactionableOutputOperator.java | 20 ++-- .../db/jdbc/JDBCLookupCacheBackedOperator.java | 15 +-- .../lib/db/jdbc/JdbcPOJOInputOperator.java | 55 ++++++--- .../lib/db/jdbc/JdbcPOJOOutputOperator.java | 98 +++++++++------ .../com/datatorrent/lib/db/jdbc/JdbcStore.java | 14 +-- .../lib/db/jdbc/JdbcTransactionalStore.java | 36 +++--- .../lib/io/AbstractFTPInputOperator.java | 14 +-- .../lib/io/fs/AbstractFileOutputOperator.java | 23 ++-- .../lib/io/jms/AbstractJMSInputOperator.java | 85 +++++++------ .../lib/io/jms/JMSStringInputOperator.java | 10 +- .../datatorrent/lib/metric/AvgAggregator.java | 3 +- .../lib/metric/max/DoubleMaxAggregator.java | 3 +- .../lib/metric/max/FloatMaxAggregator.java | 3 +- .../lib/metric/max/IntMaxAggregator.java | 3 +- .../lib/metric/max/LongMaxAggregator.java | 1 - .../lib/metric/min/DoubleMinAggregator.java | 3 +- .../lib/metric/min/FloatMinAggregator.java | 3 +- .../lib/metric/min/IntMinAggregator.java | 3 +- .../lib/metric/min/LongMinAggregator.java | 3 +- .../lib/codec/KryoStreamCodecTest.java | 120 ++++++++++--------- .../lib/db/cache/CacheManagerTest.java | 2 +- .../jdbc/JDBCLookupCacheBackedOperatorTest.java | 33 ++--- .../lib/db/jdbc/JdbcOperatorTest.java | 43 ++++--- .../datatorrent/lib/db/jdbc/JdbcStoreTest.java | 7 +- .../lib/io/FTPStringInputOperatorTest.java | 7 +- .../io/fs/AbstractFileOutputOperatorTest.java | 4 +- .../lib/io/jms/JMSStringInputOperatorTest.java | 39 +++--- pom.xml | 2 +- 34 files changed, 388 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java index 3d5ed3b..6f8750d 100644 --- a/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java +++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueValueCount.java @@ -24,15 +24,13 @@ import java.util.Set; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.datatorrent.lib.util.KeyValPair; - -import com.datatorrent.common.util.BaseOperator; - import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; /** * This operator counts the number of unique values corresponding to a key within a window. @@ -89,20 +87,21 @@ public class UniqueValueCount<K> extends BaseOperator @SuppressWarnings({"rawtypes", "unchecked"}) public Unifier<KeyValPair<K, Integer>> getUnifier() { - return (Unifier) new UniqueCountUnifier<K>(); + return (Unifier)new UniqueCountUnifier<K>(); } }; /** * The output port which emits key and set containing unique values */ - public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues = new DefaultOutputPort<KeyValPair<K, Set<Object>>>() + public final transient DefaultOutputPort<KeyValPair<K, Set<Object>>> outputValues = + new DefaultOutputPort<KeyValPair<K, Set<Object>>>() { @SuppressWarnings({"unchecked", "rawtypes"}) @Override public Unifier<KeyValPair<K, Set<Object>>> getUnifier() { - return (Unifier) new UniqueCountSetUnifier<K>(); + return (Unifier)new UniqueCountSetUnifier<K>(); } }; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java b/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java index ecfa422..99e0a6e 100644 --- a/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java +++ b/library/src/main/java/com/datatorrent/lib/codec/KryoSerializableStreamCodec.java @@ -28,6 +28,7 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.common.base.Preconditions; + import com.datatorrent.api.StreamCodec; import com.datatorrent.netlet.util.Slice; @@ -56,7 +57,8 @@ public class KryoSerializableStreamCodec<T> implements StreamCodec<T>, Serializa } /** - * Registers a class with kryo. If the class of the tuple and its fields are registered then kryo serialization is more efficient. + * Registers a class with kryo. If the class of the tuple and its fields are registered then kryo serialization is + * more efficient. * * @param clazz class to register with Kryo. */ http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java b/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java index 1510adf..42e02d3 100644 --- a/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/cache/AbstractDBLookupCacheBackedOperator.java @@ -29,7 +29,6 @@ import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; - import com.datatorrent.lib.db.Connectable; import com.datatorrent.lib.util.KeyValPair; @@ -53,7 +52,8 @@ import com.datatorrent.lib.util.KeyValPair; * @param <S> type of store * @since 0.9.1 */ -public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectable> implements Operator, CacheManager.Backup +public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectable> + implements Operator, CacheManager.Backup { @NotNull protected S store; @@ -83,11 +83,11 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab Object value = cacheManager.get(key); if (value != null) { - output.emit(new KeyValPair<Object, Object>(key, value)); + output.emit(new KeyValPair<>(key, value)); } } - public final transient DefaultOutputPort<KeyValPair<Object, Object>> output = new DefaultOutputPort<KeyValPair<Object, Object>>(); + public final transient DefaultOutputPort<KeyValPair<Object, Object>> output = new DefaultOutputPort<>(); @Override public void beginWindow(long l) @@ -107,8 +107,7 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab cacheManager.setBackup(this); try { cacheManager.initialize(); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -118,8 +117,7 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab { try { cacheManager.close(); - } - catch (IOException e) { + } catch (IOException e) { LOG.error("closing manager", e); } } @@ -171,6 +169,6 @@ public abstract class AbstractDBLookupCacheBackedOperator<T, S extends Connectab */ protected abstract Object getKeyFromTuple(T tuple); - private final static Logger LOG = LoggerFactory.getLogger(AbstractDBLookupCacheBackedOperator.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractDBLookupCacheBackedOperator.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java index 6d9f89d..2ea31f1 100644 --- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java +++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheManager.java @@ -20,7 +20,12 @@ package com.datatorrent.lib.db.cache; import java.io.Closeable; import java.io.IOException; -import java.util.*; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -36,15 +41,15 @@ import com.datatorrent.lib.db.KeyValueStore; /** * Manages primary and secondary stores.<br/> - * <p> - * It firsts checks the primary store for a key. If the primary store doesn't have the key, it queries the backup store and retrieves the value.<br/> + * It firsts checks the primary store for a key. If the primary store doesn't have the key, it queries the backup + * store and retrieves the value.<br/> * If the key was present in the backup store, its value is returned and also saved in the primary store. - * </p> - * <p> - * Typically primary store is faster but has limited size like memory and backup store is slower but unlimited like databases.<br/> - * Store Manager can also refresh the values of keys at a specified time every day. This time is in format HH:mm:ss Z.<br/> + * <p/> + * Typically primary store is faster but has limited size like memory and backup store is slower but unlimited like + * databases.<br/> + * Store Manager can also refresh the values of keys at a specified time every day. + * This time is in format HH:mm:ss Z.<br/> * This is not thread-safe. - * </p> * * @since 0.9.2 */ @@ -180,7 +185,7 @@ public class CacheManager implements Closeable /** * A primary store should also provide setting the value for a key. */ - public static interface Primary extends KeyValueStore + public interface Primary extends KeyValueStore { /** @@ -195,7 +200,7 @@ public class CacheManager implements Closeable * Backup store is queried when {@link Primary} doesn't contain a key.<br/> * It also provides data needed at startup.<br/> */ - public static interface Backup extends KeyValueStore + public interface Backup extends KeyValueStore { /** * <br>Backup stores are also used to initialize primary stores. This fetches initialization data.</br> @@ -206,6 +211,6 @@ public class CacheManager implements Closeable } @SuppressWarnings("unused") - private final static Logger LOG = LoggerFactory.getLogger(CacheManager.class); + private static final Logger LOG = LoggerFactory.getLogger(CacheManager.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java index d10fe13..72063ee 100644 --- a/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java +++ b/library/src/main/java/com/datatorrent/lib/db/cache/CacheStore.java @@ -114,8 +114,7 @@ public class CacheStore implements CacheManager.Primary CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder(); if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_ACCESS) { cacheBuilder.expireAfterAccess(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS); - } - else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) { + } else if (entryExpiryStrategy == ExpiryType.EXPIRE_AFTER_WRITE) { cacheBuilder.expireAfterWrite(entryExpiryDurationInMillis, TimeUnit.MILLISECONDS); } cache = cacheBuilder.build(); @@ -186,7 +185,7 @@ public class CacheStore implements CacheManager.Primary /** * Strategies for time-based expiration of entries. */ - public static enum ExpiryType + public enum ExpiryType { /** * Only expire the entries after the specified duration has passed since the entry was last accessed by a read http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java index 1cb7635..fe6b077 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java @@ -35,7 +35,8 @@ import com.datatorrent.lib.db.AbstractStoreInputOperator; * and emits the data as tuples. * Subclasses should implement the methods required to read the data from the database. * <p> - * This is an abstract class. Sub-classes need to implement {@link #queryToRetrieveData()} and {@link #getTuple(ResultSet)}. + * This is an abstract class. Sub-classes need to implement + * {@link #queryToRetrieveData()} and {@link #getTuple(ResultSet)}. * </p> * @displayName Abstract JDBC Input * @category Input @@ -84,17 +85,14 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe outputPort.emit(tuple); } while (result.next()); - } - else { + } else { // No rows available wait for some time before retrying so as to not continuously slam the database Thread.sleep(waitForDataTimeout); } - } - catch (SQLException ex) { + } catch (SQLException ex) { store.disconnect(); throw new RuntimeException(String.format("Error while running query: %s", query), ex); - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { throw new RuntimeException(ex); } } @@ -106,8 +104,7 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe super.setup(context); try { queryStatement = store.getConnection().createStatement(); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("creating query", e); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java index b57a9b1..77b76c1 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java @@ -22,8 +22,8 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; -import javax.annotation.Nonnull; import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.datatorrent.api.Context; - import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator; /** @@ -40,7 +39,8 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator * <p> * This operator creates a transaction at the start of window, executes batches of sql updates, * and closes the transaction at the end of the window. Each tuple corresponds to an SQL update statement. - * The operator groups the updates in a batch and submits them with one call to the database. Batch processing improves performance considerably.<br/> + * The operator groups the updates in a batch and submits them with one call to the database. Batch processing + * improves performance considerably.<br/> * The size of a batch is configured by batchSize property. * </p> * <p> @@ -55,7 +55,8 @@ import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator * @param <T> type of tuple * @since 0.9.4 */ -public abstract class AbstractJdbcTransactionableOutputOperator<T> extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore> +public abstract class AbstractJdbcTransactionableOutputOperator<T> + extends AbstractPassThruTransactionableStoreOutputOperator<T, JdbcTransactionalStore> { protected static int DEFAULT_BATCH_SIZE = 1000; @@ -80,8 +81,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr super.setup(context); try { updateCommand = store.connection.prepareStatement(getUpdateCommand()); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } @@ -117,11 +117,9 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr } updateCommand.executeBatch(); updateCommand.clearBatch(); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("processing batch", e); - } - finally { + } finally { batchStartIdx += tuples.size() - batchStartIdx; } } @@ -142,7 +140,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> extends Abstr * * @return the sql statement to update a tuple in the database. */ - @Nonnull + @NotNull protected abstract String getUpdateCommand(); /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java index 93006eb..7ca2ae1 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperator.java @@ -78,8 +78,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC try { putStatement = store.connection.prepareStatement(insertQuery); getStatement = store.connection.prepareStatement(getQuery); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -90,8 +89,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC try { preparePutStatement(putStatement, key, value); putStatement.executeUpdate(); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("while executing insert", e); } } @@ -103,8 +101,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC prepareGetStatement(getStatement, key); ResultSet resultSet = getStatement.executeQuery(); return processResultSet(resultSet); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("while fetching key", e); } } @@ -118,8 +115,7 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC prepareGetStatement(getStatement, key); ResultSet resultSet = getStatement.executeQuery(); values.add(processResultSet(resultSet)); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("while fetching keys", e); } } @@ -128,7 +124,8 @@ public abstract class JDBCLookupCacheBackedOperator<T> extends AbstractDBLookupC protected abstract void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException; - protected abstract void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException; + protected abstract void preparePutStatement(PreparedStatement putStatement, Object key, Object value) + throws SQLException; protected abstract String fetchInsertQuery(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java index eafff3c..3aa6fac 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java @@ -19,7 +19,14 @@ package com.datatorrent.lib.db.jdbc; import java.math.BigDecimal; -import java.sql.*; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; import java.util.List; import java.util.Map; @@ -37,7 +44,6 @@ import com.datatorrent.api.Context; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; - import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.lib.util.PojoUtils; @@ -49,8 +55,8 @@ import com.datatorrent.lib.util.PojoUtils; * * For eg. user can set the query property to a complex one : "select x1, x2 from t1, t2 where t1.x3 = t2.x3 ;"<br/> * - * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is most efficient - * when the tables/views are indexed and the query uses this information to retrieve data.<br/> + * This implementation is generic so it uses offset/limit mechanism for batching which is not optimal. Batching is + * most efficient when the tables/views are indexed and the query uses this information to retrieve data.<br/> * This can be achieved in sub-classes by overriding {@link #queryToRetrieveData()} and {@link #setRuntimeParams()}. * * @displayName Jdbc Input Operator @@ -58,7 +64,8 @@ import com.datatorrent.lib.util.PojoUtils; * @tags database, sql, pojo, jdbc * @since 2.1.0 */ -public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> implements Operator.ActivationListener<Context.OperatorContext> +public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> + implements Operator.ActivationListener<Context.OperatorContext> { private static int DEF_FETCH_SIZE = 100; @@ -363,53 +370,65 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> imp switch (type) { case (Types.CHAR): case (Types.VARCHAR): - activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(), + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), String.class); break; case (Types.BOOLEAN): - activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.TINYINT): - activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterByte(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.SMALLINT): - activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterShort(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.INTEGER): - activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.BIGINT): - activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.FLOAT): - activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.DOUBLE): - activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case Types.DECIMAL: - activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(), - BigDecimal.class); + activeFieldInfo.setterOrGetter = PojoUtils.createSetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), + BigDecimal.class); break; case Types.TIMESTAMP: - activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case Types.TIME: - activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case Types.DATE: - activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; default: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java index 5399c44..ffb4160 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java @@ -18,12 +18,32 @@ */ package com.datatorrent.lib.db.jdbc; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.collect.Lists; + import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.InputPortFieldAnnotation; - import com.datatorrent.lib.util.FieldInfo; import com.datatorrent.lib.util.PojoUtils; import com.datatorrent.lib.util.PojoUtils.Getter; @@ -34,18 +54,6 @@ import com.datatorrent.lib.util.PojoUtils.GetterInt; import com.datatorrent.lib.util.PojoUtils.GetterLong; import com.datatorrent.lib.util.PojoUtils.GetterShort; -import java.math.BigDecimal; -import java.sql.*; -import java.util.List; - -import javax.validation.constraints.NotNull; - -import org.apache.hadoop.classification.InterfaceStability.Evolving; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - /** * <p> * JdbcPOJOOutputOperator class.</p> @@ -57,7 +65,8 @@ import com.google.common.collect.Lists; * @since 2.1.0 */ @Evolving -public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> implements Operator.ActivationListener<OperatorContext> +public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object> + implements Operator.ActivationListener<OperatorContext> { @NotNull private List<FieldInfo> fieldInfos; @@ -165,51 +174,51 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe switch (type) { case (Types.CHAR): case (Types.VARCHAR): - statement.setString(i + 1, ((Getter<Object, String>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setString(i + 1, ((Getter<Object, String>)activeFieldInfo.setterOrGetter).get(tuple)); break; case (Types.BOOLEAN): - statement.setBoolean(i + 1, ((GetterBoolean<Object>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setBoolean(i + 1, ((GetterBoolean<Object>)activeFieldInfo.setterOrGetter).get(tuple)); break; case (Types.TINYINT): - statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setByte(i + 1, ((PojoUtils.GetterByte<Object>)activeFieldInfo.setterOrGetter).get(tuple)); break; case (Types.SMALLINT): - statement.setShort(i + 1, ((GetterShort<Object>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setShort(i + 1, ((GetterShort<Object>)activeFieldInfo.setterOrGetter).get(tuple)); break; case (Types.INTEGER): - statement.setInt(i + 1, ((GetterInt<Object>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setInt(i + 1, ((GetterInt<Object>)activeFieldInfo.setterOrGetter).get(tuple)); break; case (Types.BIGINT): - statement.setLong(i + 1, ((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setLong(i + 1, ((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple)); break; case (Types.FLOAT): - statement.setFloat(i + 1, ((GetterFloat<Object>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setFloat(i + 1, ((GetterFloat<Object>)activeFieldInfo.setterOrGetter).get(tuple)); break; case (Types.DOUBLE): - statement.setDouble(i + 1, ((GetterDouble<Object>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setDouble(i + 1, ((GetterDouble<Object>)activeFieldInfo.setterOrGetter).get(tuple)); break; case Types.DECIMAL: - statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>) activeFieldInfo.setterOrGetter).get(tuple)); + statement.setBigDecimal(i + 1, ((Getter<Object, BigDecimal>)activeFieldInfo.setterOrGetter).get(tuple)); break; case Types.TIMESTAMP: - statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple))); + statement.setTimestamp(i + 1, new Timestamp(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple))); break; case Types.TIME: - statement.setTime(i + 1, new Time(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple))); + statement.setTime(i + 1, new Time(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple))); break; case Types.DATE: - statement.setDate(i + 1, new Date(((GetterLong<Object>) activeFieldInfo.setterOrGetter).get(tuple))); + statement.setDate(i + 1, new Date(((GetterLong<Object>)activeFieldInfo.setterOrGetter).get(tuple))); break; default: @@ -271,53 +280,64 @@ public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOpe switch (type) { case (Types.CHAR): case (Types.VARCHAR): - activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(), + activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), String.class); break; case (Types.BOOLEAN): - activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterBoolean(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.TINYINT): - activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterByte(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.SMALLINT): - activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterShort(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.INTEGER): - activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterInt(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.BIGINT): - activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.FLOAT): - activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterFloat(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case (Types.DOUBLE): - activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterDouble(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case Types.DECIMAL: - activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression(), - BigDecimal.class); + activeFieldInfo.setterOrGetter = PojoUtils.createGetter(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression(), BigDecimal.class); break; case Types.TIMESTAMP: - activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case Types.TIME: - activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; case Types.DATE: - activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, activeFieldInfo.fieldInfo.getPojoFieldExpression()); + activeFieldInfo.setterOrGetter = PojoUtils.createGetterLong(pojoClass, + activeFieldInfo.fieldInfo.getPojoFieldExpression()); break; default: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java index 2e284ad..d9901aa 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcStore.java @@ -32,8 +32,8 @@ import com.google.common.base.CharMatcher; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.db.Connectable; +import com.datatorrent.netlet.util.DTThrowable; /** * A {@link Connectable} that uses jdbc to connect to stores. @@ -125,7 +125,8 @@ public class JdbcStore implements Connectable */ public void setConnectionProperties(String connectionProps) { - String[] properties = Iterables.toArray(Splitter.on(CharMatcher.anyOf(":,")).omitEmptyStrings().trimResults().split(connectionProps), String.class); + String[] properties = Iterables.toArray(Splitter.on(CharMatcher.anyOf(":,")).omitEmptyStrings().trimResults() + .split(connectionProps), String.class); for (int i = 0; i < properties.length; i += 2) { if (i + 1 < properties.length) { connectionProperties.put(properties[i], properties[i + 1]); @@ -163,8 +164,7 @@ public class JdbcStore implements Connectable connection = DriverManager.getConnection(databaseUrl, connectionProperties); logger.debug("JDBC connection Success"); - } - catch (Throwable t) { + } catch (Throwable t) { DTThrowable.rethrow(t); } } @@ -177,8 +177,7 @@ public class JdbcStore implements Connectable { try { connection.close(); - } - catch (SQLException ex) { + } catch (SQLException ex) { throw new RuntimeException("closing database resource", ex); } } @@ -188,8 +187,7 @@ public class JdbcStore implements Connectable { try { return !connection.isClosed(); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("is isConnected", e); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java index e4a7229..9bc18b0 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcTransactionalStore.java @@ -24,10 +24,11 @@ import java.sql.SQLException; import javax.validation.constraints.NotNull; -import com.datatorrent.lib.db.TransactionableStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.lib.db.TransactionableStore; + /** * <p>JdbcTransactionalStore class.</p> * @@ -117,7 +118,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable super.connect(); try { String command = "select " + metaTableWindowColumn + " from " + metaTable + " where " + metaTableAppIdColumn + - " = ? and " + metaTableOperatorIdColumn + " = ?"; + " = ? and " + metaTableOperatorIdColumn + " = ?"; logger.debug(command); lastWindowFetchCommand = connection.prepareStatement(command); @@ -126,18 +127,18 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable logger.debug(command); lastWindowInsertCommand = connection.prepareStatement(command); - command = "update " + metaTable + " set " + metaTableWindowColumn + " = ? where " + metaTableAppIdColumn + " = ? " + - " and " + metaTableOperatorIdColumn + " = ?"; + command = "update " + metaTable + " set " + metaTableWindowColumn + " = ? where " + metaTableAppIdColumn + " = ? " + + " and " + metaTableOperatorIdColumn + " = ?"; logger.debug(command); lastWindowUpdateCommand = connection.prepareStatement(command); - command = "delete from " + metaTable + " where " + metaTableAppIdColumn + " = ? and " + metaTableOperatorIdColumn + " = ?"; + command = "delete from " + metaTable + " where " + metaTableAppIdColumn + " = ? and " + metaTableOperatorIdColumn + + " = ?"; logger.debug(command); lastWindowDeleteCommand = connection.prepareStatement(command); connection.setAutoCommit(false); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -148,8 +149,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable if (lastWindowUpdateCommand != null) { try { lastWindowUpdateCommand.close(); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -168,8 +168,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable try { connection.commit(); inTransaction = false; - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -180,8 +179,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable try { connection.rollback(); inTransaction = false; - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -231,16 +229,14 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable ResultSet resultSet = lastWindowFetchCommand.executeQuery(); if (resultSet.next()) { lastWindow = resultSet.getLong(1); - } - else { + } else { lastWindowInsertCommand.setString(1, appId); lastWindowInsertCommand.setInt(2, operatorId); lastWindowInsertCommand.setLong(3, -1); lastWindowInsertCommand.executeUpdate(); } return lastWindow; - } - catch (SQLException ex) { + } catch (SQLException ex) { throw new RuntimeException(ex); } } @@ -253,8 +249,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable lastWindowUpdateCommand.setString(2, appId); lastWindowUpdateCommand.setInt(3, operatorId); lastWindowUpdateCommand.executeUpdate(); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -266,8 +261,7 @@ public class JdbcTransactionalStore extends JdbcStore implements Transactionable lastWindowDeleteCommand.setString(1, appId); lastWindowDeleteCommand.setInt(2, operatorId); lastWindowDeleteCommand.executeUpdate(); - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java index 1cf57c5..d34f99b 100644 --- a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java @@ -27,15 +27,15 @@ import java.util.Map; import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.net.ftp.FTP; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.ftp.FTPFileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.datatorrent.api.DefaultOutputPort; - import com.datatorrent.lib.io.fs.AbstractFileInputOperator; /** @@ -90,10 +90,10 @@ public abstract class AbstractFTPInputOperator<T> extends AbstractFileInputOpera { super.partitioned(partitions); for (Partition<AbstractFileInputOperator<T>> partition : partitions.values()) { - ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).host = host; - ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).port = port; - ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).userName = userName; - ((AbstractFTPInputOperator<T>) partition.getPartitionedInstance()).password = password; + ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).host = host; + ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).port = port; + ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).userName = userName; + ((AbstractFTPInputOperator<T>)partition.getPartitionedInstance()).password = password; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 2aa658f..15a208f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -217,7 +217,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp /** * File output counters. */ - protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class); + protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<>(MutableLong.class); protected StreamCodec<INPUT> streamCodec; @@ -252,8 +252,9 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp * accessed by a read or write. * <p/> * https://code.google.com/p/guava-libraries/wiki/CachesExplained <br/> - * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a value expires, or anything of the sort. - * Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.<br/> + * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a + * value expires, or anything of the sort. Instead, it performs small amounts of maintenance during write + * operations, or during occasional read operations if writes are rare.<br/> * This isn't the most effective way but adds a little bit of optimization. */ private Long expireStreamAfterAcessMillis; @@ -275,8 +276,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp { if (AbstractFileOutputOperator.this.streamCodec == null) { return super.getStreamCodec(); - } - else { + } else { return streamCodec; } } @@ -414,8 +414,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp LOG.debug("rotating file at setup."); rotate(seenFileName); } - } - catch (IOException | ExecutionException e) { + } catch (IOException | ExecutionException e) { throw new RuntimeException(e); } } @@ -441,8 +440,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp * @param filename name of the actual file. * @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the * latest open part file name. - * @param filepath path of the file. When always writing to temp file, this is the path of the temp file; otherwise - * path of the actual file. + * @param filepath path of the file. When always writing to temp file, this is the path of the temp file; + * otherwise path of the actual file. * @throws IOException */ private void recoverFile(String filename, String partFileName, Path filepath) throws IOException @@ -714,7 +713,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp //Close all the streams you can Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); - for(String seenFileName: openStreams.keySet()) { + for (String seenFileName : openStreams.keySet()) { FSFilterStreamContext fsFilterStreamContext = openStreams.get(seenFileName); try { long start = System.currentTimeMillis(); @@ -1238,8 +1237,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp LOG.debug("rename from tmp {} actual {} ", tmpFileName, fileName); rename(srcPath, destPath); } else if (fs.exists(srcPath)) { - //if the destination and src both exists that means there was a failure between file rename and clearing the endOffset so - //we just delete the tmp file. + /*if the destination and src both exists that means there was a failure between file rename and clearing the + endOffset so we just delete the tmp file*/ LOG.debug("deleting tmp {}", tmpFileName); fs.delete(srcPath, true); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java index fe9e35f..43445a7 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java @@ -25,14 +25,22 @@ import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicReference; -import javax.jms.*; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; -import org.apache.commons.lang.mutable.MutableLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.mutable.MutableLong; + import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -43,10 +51,9 @@ import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.annotation.OperatorAnnotation; - -import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.counters.BasicCounters; import com.datatorrent.lib.io.IdempotentStorageManager; +import com.datatorrent.netlet.util.DTThrowable; /** * This is the base implementation of a JMS input operator.<br/> @@ -71,8 +78,9 @@ import com.datatorrent.lib.io.IdempotentStorageManager; * @since 0.3.2 */ @OperatorAnnotation(checkpointableWithinAppWindow = false) -public abstract class AbstractJMSInputOperator<T> extends JMSBase implements InputOperator, ActivationListener<OperatorContext>, - MessageListener, ExceptionListener, Operator.IdleTimeHandler, Operator.CheckpointListener +public abstract class AbstractJMSInputOperator<T> extends JMSBase + implements InputOperator, ActivationListener<OperatorContext>, MessageListener, ExceptionListener, + Operator.IdleTimeHandler, Operator.CheckpointListener { protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k @@ -102,8 +110,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp protected transient long currentWindowId; protected transient int emitCount; - private transient final Set<String> pendingAck; - private transient final Lock lock; + private final transient Set<String> pendingAck; + private final transient Lock lock; public final transient DefaultOutputPort<T> output = new DefaultOutputPort<T>(); @@ -129,8 +137,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp synchronized (lock) { try { return messageConsumed(message) && super.add(message); - } - catch (JMSException e) { + } catch (JMSException e) { LOG.error("message consumption", e); throwable.set(e); throw new RuntimeException(e); @@ -162,10 +169,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp { try { if (message.getJMSReplyTo() != null) { // Send reply only if the replyTo destination is set - replyProducer.send(message.getJMSReplyTo(), getSession().createTextMessage("Reply: " + message.getJMSMessageID())); + replyProducer.send(message.getJMSReplyTo(), + getSession().createTextMessage("Reply: " + message.getJMSMessageID())); } - } - catch (JMSException ex) { + } catch (JMSException ex) { LOG.error(ex.getLocalizedMessage()); throwable.set(ex); throw new RuntimeException(ex); @@ -199,8 +206,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp if (operatorRecoveredWindows != null) { Arrays.sort(operatorRecoveredWindows); } - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException("fetching windows", e); } } @@ -223,7 +229,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp pendingAck.add(message.getJMSMessageID()); MutableLong receivedCt = counters.getCounter(CounterKeys.RECEIVED); receivedCt.increment(); - LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(), receivedCt.longValue()); + LOG.debug("message id: {} buffer size: {} received: {}", message.getJMSMessageID(), holdingBuffer.size(), + receivedCt.longValue()); return true; } @@ -239,11 +246,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp replyProducer = getSession().createProducer(null); consumer = (isDurable() && isTopic()) ? - getSession().createDurableSubscriber((Topic) getDestination(), consumerName) : - getSession().createConsumer(getDestination()); + getSession().createDurableSubscriber((Topic)getDestination(), consumerName) : + getSession().createConsumer(getDestination()); consumer.setMessageListener(this); - } - catch (JMSException ex) { + } catch (JMSException ex) { throw new RuntimeException(ex); } } @@ -264,7 +270,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp { try { @SuppressWarnings("unchecked") - Map<String, T> recoveredData = (Map<String, T>) idempotentStorageManager.load(context.getId(), windowId); + Map<String, T> recoveredData = (Map<String, T>)idempotentStorageManager.load(context.getId(), windowId); if (recoveredData == null) { return; } @@ -272,8 +278,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp pendingAck.add(recoveredEntry.getKey()); emit(recoveredEntry.getValue()); } - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException("replay", e); } } @@ -306,8 +311,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp currentWindowRecoveryState.put(message.getJMSMessageID(), payload); emit(payload); } - } - catch (JMSException e) { + } catch (JMSException e) { throw new RuntimeException("processing msg", e); } } @@ -320,12 +324,10 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp /* nothing to do here, so sleep for a while to avoid busy loop */ try { Thread.sleep(spinMillis); - } - catch (InterruptedException ie) { + } catch (InterruptedException ie) { throw new RuntimeException(ie); } - } - else { + } else { DTThrowable.rethrow(lthrowable); } } @@ -338,7 +340,8 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp * acknowledged have been persisted because they wouldn't be redelivered. Also if they are persisted then * they shouldn't be re-delivered because that would cause duplicates.<br/> * - * This is why when recovery data is persisted and messages are acknowledged, the thread that consumes message is blocked.<br/> + * This is why when recovery data is persisted and messages are acknowledged, the thread that consumes message is + * blocked.<br/> */ @Override public void endWindow() @@ -365,27 +368,24 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp } ackCompleted = true; pendingAck.clear(); - } - catch (Throwable t) { + } catch (Throwable t) { if (!ackCompleted) { LOG.info("confirm recovery of {} for {} does not exist", context.getId(), currentWindowId, t); } DTThrowable.rethrow(t); - } - finally { + } finally { if (stateSaved && !ackCompleted) { try { idempotentStorageManager.delete(context.getId(), currentWindowId); - } - catch (IOException e) { + } catch (IOException e) { LOG.error("unable to delete corrupted state", e); } } } } emitCount = 0; //reset emit count - } - else if (operatorRecoveredWindows != null && currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) { + } else if (operatorRecoveredWindows != null && + currentWindowId < operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) { //pendingAck is not cleared for the last replayed window of this operator. This is because there is //still a chance that in the previous run the operator crashed after saving the state but before acknowledgement. pendingAck.clear(); @@ -402,8 +402,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp { if (isTransacted()) { getSession().commit(); - } - else if (getSessionAckMode(getAckMode()) == Session.CLIENT_ACKNOWLEDGE) { + } else if (getSessionAckMode(getAckMode()) == Session.CLIENT_ACKNOWLEDGE) { lastMsg.acknowledge(); // acknowledge all consumed messages till now } } @@ -418,8 +417,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp { try { idempotentStorageManager.deleteUpTo(context.getId(), windowId); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException("committing", e); } } @@ -441,8 +439,7 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase implements Inp consumer = null; super.cleanup(); - } - catch (JMSException ex) { + } catch (JMSException ex) { throw new RuntimeException("at cleanup", ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java index bbc8d0f..a72d127 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSStringInputOperator.java @@ -37,12 +37,10 @@ public class JMSStringInputOperator extends AbstractJMSInputOperator<String> public String convert(Message message) throws JMSException { if (message instanceof TextMessage) { - return ((TextMessage) message).getText(); - } - else if (message instanceof StreamMessage) { - return ((StreamMessage) message).readString(); - } - else { + return ((TextMessage)message).getText(); + } else if (message instanceof StreamMessage) { + return ((StreamMessage)message).readString(); + } else { throw new IllegalArgumentException("Unhandled message type " + message.getClass().getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java index 33f7f8d..ab9a261 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/AvgAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** @@ -38,7 +37,7 @@ public class AvgAggregator implements SingleMetricAggregator, Serializable double sum = 0; for (Object value : metricValues) { - sum += ((Number) value).doubleValue(); + sum += ((Number)value).doubleValue(); } return sum / metricValues.size(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java index 40d8c04..13988f9 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/max/DoubleMaxAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** @@ -37,7 +36,7 @@ public class DoubleMaxAggregator implements SingleMetricAggregator, Serializable { Double max = null; for (Object value : metricValues) { - double dval = ((Number) value).doubleValue(); + double dval = ((Number)value).doubleValue(); if (max == null || dval > max) { max = dval; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java index 68f8f2a..6b228e8 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/max/FloatMaxAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** @@ -37,7 +36,7 @@ public class FloatMaxAggregator implements SingleMetricAggregator, Serializable { Float max = null; for (Object value : metricValues) { - float fval = ((Number) value).floatValue(); + float fval = ((Number)value).floatValue(); if (max == null || fval > max) { max = fval; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java index 71eaba0..2cd0264 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/max/IntMaxAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** @@ -37,7 +36,7 @@ public class IntMaxAggregator implements SingleMetricAggregator, Serializable { Integer max = null; for (Object value : metricValues) { - int ival = ((Number) value).intValue(); + int ival = ((Number)value).intValue(); if (max == null || ival > max) { max = ival; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java index 43a1abd..f0aab73 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/max/LongMaxAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java index d65e744..1b1b712 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/min/DoubleMinAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** @@ -37,7 +36,7 @@ public class DoubleMinAggregator implements SingleMetricAggregator, Serializable { Double min = null; for (Object value : metricValues) { - double dval = ((Number) value).doubleValue(); + double dval = ((Number)value).doubleValue(); if (min == null || dval < min) { min = dval; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java index bbe0861..6713911 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/min/FloatMinAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** @@ -37,7 +36,7 @@ public class FloatMinAggregator implements SingleMetricAggregator, Serializable { Float min = null; for (Object metric : metricValues) { - float fval = ((Number) metric).floatValue(); + float fval = ((Number)metric).floatValue(); if (min == null || fval < min) { min = fval; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java index 76ffebc..72c610f 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/min/IntMinAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** @@ -37,7 +36,7 @@ public class IntMinAggregator implements SingleMetricAggregator, Serializable { Integer min = null; for (Object value : metricValues) { - int ival = ((Number) value).intValue(); + int ival = ((Number)value).intValue(); if (min == null || ival < min) { min = ival; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java b/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java index 19a1dab..0deb808 100644 --- a/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/metric/min/LongMinAggregator.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.Collection; import com.datatorrent.api.annotation.Name; - import com.datatorrent.common.metric.SingleMetricAggregator; /** @@ -37,7 +36,7 @@ public class LongMinAggregator implements SingleMetricAggregator, Serializable { Long min = null; for (Object value : metricValues) { - long lval = ((Number) value).longValue(); + long lval = ((Number)value).longValue(); if (min == null || lval < min) { min = lval; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java b/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java index 0d5c646..4affef7 100644 --- a/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java +++ b/library/src/test/java/com/datatorrent/lib/codec/KryoStreamCodecTest.java @@ -31,79 +31,89 @@ import com.datatorrent.netlet.util.Slice; * Tests for {@link KryoSerializableStreamCodec} * */ -public class KryoStreamCodecTest { +public class KryoStreamCodecTest +{ - public static class TestTuple { - final Integer field; + public static class TestTuple + { + final Integer field; - @SuppressWarnings("unused") - private TestTuple(){ - this.field= 0; - } - - public TestTuple(Integer x){ - this.field= Preconditions.checkNotNull(x,"x"); - } + @SuppressWarnings("unused") + private TestTuple() + { + this.field = 0; + } - @Override - public boolean equals(Object o) { - return o.getClass()== this.getClass() && ((TestTuple)o).field.equals(this.field); - } + public TestTuple(Integer x) + { + this.field = Preconditions.checkNotNull(x, "x"); + } - @Override - public int hashCode() { - return TestTuple.class.hashCode()^ this.field.hashCode(); - } + @Override + public boolean equals(Object o) + { + return o.getClass() == this.getClass() && ((TestTuple)o).field.equals(this.field); } - public static class TestKryoStreamCodec extends KryoSerializableStreamCodec<TestTuple> { + @Override + public int hashCode() + { + return TestTuple.class.hashCode() ^ this.field.hashCode(); + } + } - public TestKryoStreamCodec(){ - super(); - } + public static class TestKryoStreamCodec extends KryoSerializableStreamCodec<TestTuple> + { - @Override - public int getPartition(TestTuple testTuple) { - return testTuple.field; - } + public TestKryoStreamCodec() + { + super(); } - @Test - public void testSomeMethod() throws IOException + @Override + public int getPartition(TestTuple testTuple) { - TestKryoStreamCodec coder = new TestKryoStreamCodec(); - TestKryoStreamCodec decoder = new TestKryoStreamCodec(); + return testTuple.field; + } + } - KryoSerializableStreamCodec<Object> objCoder = new KryoSerializableStreamCodec<Object>(); - Slice sliceOfObj = objCoder.toByteArray(10); - Integer decodedObj = (Integer) objCoder.fromByteArray(sliceOfObj); + @Test + public void testSomeMethod() throws IOException + { + TestKryoStreamCodec coder = new TestKryoStreamCodec(); + TestKryoStreamCodec decoder = new TestKryoStreamCodec(); - Assert.assertEquals("codec", decodedObj.intValue(), 10); + KryoSerializableStreamCodec<Object> objCoder = new KryoSerializableStreamCodec<Object>(); + Slice sliceOfObj = objCoder.toByteArray(10); + Integer decodedObj = (Integer)objCoder.fromByteArray(sliceOfObj); - TestTuple tp= new TestTuple(5); + Assert.assertEquals("codec", decodedObj.intValue(), 10); - Slice dsp1 = coder.toByteArray(tp); - Slice dsp2 = coder.toByteArray(tp); - Assert.assertEquals(dsp1, dsp2); + TestTuple tp = new TestTuple(5); - Object tcObject1 = decoder.fromByteArray(dsp1); - assert (tp.equals(tcObject1)); + Slice dsp1 = coder.toByteArray(tp); + Slice dsp2 = coder.toByteArray(tp); + Assert.assertEquals(dsp1, dsp2); - Object tcObject2 = decoder.fromByteArray(dsp2); - assert (tp.equals(tcObject2)); + Object tcObject1 = decoder.fromByteArray(dsp1); + assert (tp.equals(tcObject1)); - dsp1 = coder.toByteArray(tp); - dsp2 = coder.toByteArray(tp); - Assert.assertEquals(dsp1, dsp2); - } + Object tcObject2 = decoder.fromByteArray(dsp2); + assert (tp.equals(tcObject2)); - @Test - public void testFinalFieldSerialization() throws Exception{ - TestTuple t1 = new TestTuple(5); - TestKryoStreamCodec codec= new TestKryoStreamCodec(); + dsp1 = coder.toByteArray(tp); + dsp2 = coder.toByteArray(tp); + Assert.assertEquals(dsp1, dsp2); + } - Slice dsp = codec.toByteArray(t1); - TestTuple t2 = (TestTuple)codec.fromByteArray(dsp); - Assert.assertEquals("", t1.field, t2.field); - } + @Test + public void testFinalFieldSerialization() throws Exception + { + TestTuple t1 = new TestTuple(5); + TestKryoStreamCodec codec = new TestKryoStreamCodec(); + + Slice dsp = codec.toByteArray(t1); + TestTuple t2 = (TestTuple)codec.fromByteArray(dsp); + Assert.assertEquals("", t1.field, t2.field); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java index 9ee536e..caa02bf 100644 --- a/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/cache/CacheManagerTest.java @@ -62,7 +62,7 @@ public class CacheManagerTest @Override public boolean apply(@Nullable Object key) { - return key != null && (Integer) key <= 5; + return key != null && (Integer)key <= 5; } }); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/b9aa203d/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java index 9078271..46f49e2 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JDBCLookupCacheBackedOperatorTest.java @@ -18,7 +18,12 @@ */ package com.datatorrent.lib.db.jdbc; -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.List; @@ -36,7 +41,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; import com.datatorrent.api.Context; - import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -49,8 +53,9 @@ public class JDBCLookupCacheBackedOperatorTest private static final String INMEM_DB_DRIVER = org.hsqldb.jdbcDriver.class.getName(); protected static final String TABLE_NAME = "Test_Lookup_Cache"; - protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator = new TestJDBCLookupCacheBackedOperator(); - protected static CollectorTestSink<Object> sink = new CollectorTestSink<Object>(); + protected static TestJDBCLookupCacheBackedOperator lookupCacheBackedOperator = + new TestJDBCLookupCacheBackedOperator(); + protected static CollectorTestSink<Object> sink = new CollectorTestSink<>(); protected static final Map<Integer, String> mapping = Maps.newHashMap(); static { @@ -61,9 +66,10 @@ public class JDBCLookupCacheBackedOperatorTest mapping.put(5, "five"); } - protected static transient final Logger logger = LoggerFactory.getLogger(JDBCLookupCacheBackedOperatorTest.class); + protected static final transient Logger logger = LoggerFactory.getLogger( + JDBCLookupCacheBackedOperatorTest.class); - private final static Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<List<Object>>(); + private static final Exchanger<List<Object>> bulkValuesExchanger = new Exchanger<>(); public static class TestJDBCLookupCacheBackedOperator extends JDBCLookupCacheBackedOperator<String> { @@ -83,15 +89,15 @@ public class JDBCLookupCacheBackedOperatorTest @Override protected void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException { - putStatement.setInt(1, (Integer) key); - putStatement.setString(2, (String) value); + putStatement.setInt(1, (Integer)key); + putStatement.setString(2, (String)value); } @Override protected void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException { - getStatement.setInt(1, (Integer) key); + getStatement.setInt(1, (Integer)key); } @Override @@ -121,8 +127,7 @@ public class JDBCLookupCacheBackedOperatorTest List<Object> values = super.getAll(keys); try { bulkValuesExchanger.exchange(values); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException(e); } return values; @@ -164,7 +169,7 @@ public class JDBCLookupCacheBackedOperatorTest Statement stmt = con.createStatement(); String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME - + " (col1 INTEGER, col2 VARCHAR(20))"; + + " (col1 INTEGER, col2 VARCHAR(20))"; stmt.executeUpdate(createTable); stmt.executeUpdate("Delete from " + TABLE_NAME); @@ -172,8 +177,8 @@ public class JDBCLookupCacheBackedOperatorTest // populate the database for (Map.Entry<Integer, String> entry : mapping.entrySet()) { String insert = "INSERT INTO " + TABLE_NAME - + " (col1, col2) VALUES (" + entry.getKey() + ", '" - + entry.getValue() + "')"; + + " (col1, col2) VALUES (" + entry.getKey() + ", '" + + entry.getValue() + "')"; stmt.executeUpdate(insert); }
