MLHR-1949 #resolve #comment removed unnecessary wait in JdbcInput
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/331e0083 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/331e0083 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/331e0083 Branch: refs/heads/master Commit: 331e00830a67031c540ba08d89f6b77afdc5ba1d Parents: 8d48e40 Author: Chandni Singh <[email protected]> Authored: Wed Dec 16 13:20:33 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Wed Dec 16 13:20:33 2015 -0800 ---------------------------------------------------------------------- .../datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java | 7 ------- .../com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java | 8 -------- 2 files changed, 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/331e0083/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java index fe6b077..c473ce3 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcInputOperator.java @@ -49,7 +49,6 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe { private static final Logger logger = LoggerFactory.getLogger(AbstractJdbcInputOperator.class); protected transient Statement queryStatement; - private transient int waitForDataTimeout; /** * Any concrete class has to override this method to convert a Database row into Tuple. @@ -85,22 +84,16 @@ public abstract class AbstractJdbcInputOperator<T> extends AbstractStoreInputOpe outputPort.emit(tuple); } while (result.next()); - } else { - // No rows available wait for some time before retrying so as to not continuously slam the database - Thread.sleep(waitForDataTimeout); } } catch (SQLException ex) { store.disconnect(); throw new RuntimeException(String.format("Error while running query: %s", query), ex); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); } } @Override public void setup(OperatorContext context) { - waitForDataTimeout = context.getValue(OperatorContext.SPIN_MILLIS); super.setup(context); try { queryStatement = store.getConnection().createStatement(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/331e0083/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java index 3aa6fac..46c18dc 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInputOperator.java @@ -95,7 +95,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> protected transient Class<?> pojoClass; protected int pageNumber; - private transient long sleepMillis; @OutputPortFieldAnnotation(schemaRequired = true) public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>() @@ -119,7 +118,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> public void setup(Context.OperatorContext context) { Preconditions.checkArgument(query != null || tableName != null, "both query and table name are not set"); - sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); super.setup(context); try { @@ -210,12 +208,6 @@ public class JdbcPOJOInputOperator extends AbstractJdbcInputOperator<Object> store.disconnect(); throw new RuntimeException(ex); } - } else { - try { - Thread.sleep(sleepMillis); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } } }
