[
https://issues.apache.org/jira/browse/APEXMALHAR-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413008#comment-15413008
]
ASF GitHub Bot commented on APEXMALHAR-2172:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/358#discussion_r74003272
--- Diff:
library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
---
@@ -286,271 +146,233 @@ public AbstractJdbcPollInputOperator()
public void setup(OperatorContext context)
{
super.setup(context);
- spinMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
+ intializeDSLContext();
+ if (scanService == null) {
+ scanService = Executors.newScheduledThreadPool(1);
+ }
execute = true;
- cause = new AtomicReference<Throwable>();
- emitQueue = new ArrayBlockingQueue<List<T>>(queueCapacity);
- this.context = context;
+ emitQueue = new LinkedBlockingDeque<>(queueCapacity);
operatorId = context.getId();
+ windowManager.setup(context);
+ }
- try {
+ private void intializeDSLContext()
+ {
+ create = DSL.using(store.getConnection(),
JDBCUtils.dialect(store.getDatabaseUrl()));
+ }
+
+ @Override
+ public void activate(OperatorContext context)
+ {
+ initializePreparedStatement();
+ long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+ if (largestRecoveryWindow == Stateless.WINDOW_ID
+ || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)
> largestRecoveryWindow) {
+ scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval,
TimeUnit.MILLISECONDS);
+ }
+ }
- //If its a range query pass upper and lower bounds
- //If its a polling query pass only the lower bound
- if (getRangeQueryPair().getValue() != null) {
- ps = store.getConnection()
- .prepareStatement(
- JdbcMetaDataUtility.buildRangeQuery(getTableName(),
getKey(), rangeQueryPair.getKey(),
- rangeQueryPair.getValue()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
+ protected void initializePreparedStatement()
+ {
+ try {
+ // If its a range query pass upper and lower bounds, If its a
polling query pass only the lower bound
+ if (isPollerPartition) {
+ ps =
store.getConnection().prepareStatement(buildRangeQuery(rangeQueryPair.getKey(),
Integer.MAX_VALUE),
+ TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
} else {
ps = store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildPollableQuery(getTableName(),
getKey(), rangeQueryPair.getKey()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
- isPollable = true;
+ buildRangeQuery(rangeQueryPair.getKey(),
(rangeQueryPair.getValue() - rangeQueryPair.getKey())),
+ TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
}
-
} catch (SQLException e) {
LOG.error("Exception in initializing the range query for a given
partition", e);
throw new RuntimeException(e);
}
- windowManager.setup(context);
- LOG.debug("super setup done...");
}
@Override
public void beginWindow(long windowId)
{
currentWindowId = windowId;
-
- isReplayed = false;
-
if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
try {
replay(currentWindowId);
+ return;
} catch (SQLException e) {
LOG.error("Exception in replayed windows", e);
throw new RuntimeException(e);
}
}
-
- if (isReplayed && currentWindowId ==
windowManager.getLargestRecoveryWindow()) {
- try {
- if (!isPollable && rangeQueryPair.getValue() != null) {
-
- ps = store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildGTRangeQuery(getTableName(),
getKey(), previousUpperBound,
- rangeQueryPair.getValue()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
- } else {
- String bound = null;
- if (previousUpperBound == null) {
- bound = getRangeQueryPair().getKey();
- } else {
- bound = previousUpperBound;
- }
- ps = store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildPollableQuery(getTableName(),
getKey(), bound),
- java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
- isPollable = true;
- }
- isReplayed = false;
- LOG.debug("Prepared statement after re-initialization - {} ",
ps.toString());
- } catch (SQLException e) {
- // TODO Auto-generated catch block
- throw new RuntimeException(e);
- }
+ if (isPollerPartition) {
+ updatePollQuery();
+ isPolled = false;
}
+ lowerBound = lastEmittedRecord;
+ }
- //Reset the pollable query with the updated upper and lower bounds
- if (isPollable) {
+ private void updatePollQuery()
+ {
+ if ((lastPolledBound != lastEmittedRecord)) {
+ if (lastEmittedRecord == null) {
+ lastPolledBound = rangeQueryPair.getKey();
+ } else {
+ lastPolledBound = lastEmittedRecord;
+ }
try {
- String bound = null;
- if (previousUpperBound == null && highestPolled == null) {
- bound = getRangeQueryPair().getKey();
- } else {
- bound = highestPolled;
- }
- ps = store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildPollableQuery(getTableName(),
getKey(), bound),
- java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
- LOG.debug("Polling query {} {}", ps.toString(), currentWindowId);
- isPolled = false;
+ ps =
store.getConnection().prepareStatement(buildRangeQuery(lastPolledBound,
Integer.MAX_VALUE),
+ TYPE_FORWARD_ONLY, CONCUR_READ_ONLY);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
- lower = null;
- upper = null;
-
- //Check if a thread is already active and start only if its no
- //Do not start the thread from setup, will conflict with the replay
- if (dbPoller == null && !isReplayed) {
- //If this is not a replayed state, reset the ps to highest read
offset + 1,
- //keep the upper bound as the one that was initialized after static
partitioning
- LOG.info("Statement when re-initialized {}", ps.toString());
- dbPoller = new Thread(new DBPoller());
- dbPoller.start();
- }
}
@Override
public void emitTuples()
{
- if (isReplayed) {
+ if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
return;
}
-
- List<T> tuples;
-
- if ((tuples = emitQueue.poll()) != null) {
- for (Object tuple : tuples) {
- if (lower == null) {
- lower = tuple.toString();
- }
- upper = tuple.toString();
- outputPort.emit((T)tuple);
+ int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() :
batchSize;
+ while (pollSize-- > 0) {
+ T obj = emitQueue.poll();
+ if (obj != null) {
+ emitTuple(obj);
}
+ lastEmittedRecord++;
}
}
+ protected void emitTuple(T obj)
+ {
+ outputPort.emit(obj);
+ }
+
@Override
public void endWindow()
{
try {
if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
- if (currentWindowRecoveryState == null) {
- currentWindowRecoveryState = new MutablePair<String, String>();
- }
- if (lower != null && upper != null) {
- previousUpperBound = upper;
- isPolled = true;
- }
- MutablePair<String, String> windowBoundaryPair = new
MutablePair<>(lower, upper);
- currentWindowRecoveryState = windowBoundaryPair;
+ currentWindowRecoveryState = new MutablePair<>(lowerBound,
lastEmittedRecord);
windowManager.save(currentWindowRecoveryState, operatorId,
currentWindowId);
}
} catch (IOException e) {
throw new RuntimeException("saving recovery", e);
}
- currentWindowRecoveryState = new MutablePair<>();
- }
-
- public int getPartitionCount()
- {
- return partitionCount;
+ if (threadException != null) {
+ store.disconnect();
+ DTThrowable.rethrow(threadException.get());
+ }
}
- public void setPartitionCount(int partitionCount)
+ @Override
+ public void deactivate()
{
- this.partitionCount = partitionCount;
+ scanService.shutdownNow();
+ store.disconnect();
}
- @Override
- public void activate(Context cntxt)
+ protected void pollRecords()
{
- if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) !=
Stateless.WINDOW_ID
- && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) <
windowManager.getLargestRecoveryWindow()) {
- // If it is a replay state, don't start any threads here
+ if (isPolled) {
return;
}
- }
-
- @Override
- public void deactivate()
- {
try {
- if (dbPoller != null && dbPoller.isAlive()) {
- dbPoller.interrupt();
- dbPoller.join();
+ ps.setFetchSize(getFetchSize());
+ ResultSet result = ps.executeQuery();
+ if (result.next()) {
+ do {
+ emitQueue.offer(getTuple(result));
+ } while (result.next());
}
- } catch (InterruptedException ex) {
- // log and ignore, ending execution anyway
- LOG.error("exception in poller thread: ", ex);
- }
- }
-
- @Override
- public void handleIdleTime()
- {
- if (execute) {
- try {
- Thread.sleep(spinMillis);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
+ isPolled = true;
+ } catch (SQLException ex) {
+ execute = false;
+ threadException = new AtomicReference<Throwable>(ex);
+ } finally {
+ if (!isPollerPartition) {
+ store.disconnect();
}
- } else {
- LOG.error("Exception: ", cause);
- DTThrowable.rethrow(cause.get());
}
}
+ public abstract T getTuple(ResultSet result);
+
protected void replay(long windowId) throws SQLException
{
- isReplayed = true;
- MutablePair<String, String> recoveredData = new MutablePair<String,
String>();
try {
- recoveredData = (MutablePair<String,
String>)windowManager.load(operatorId, windowId);
-
- if (recoveredData != null) {
- //skip the window and return if there was no incoming data in the
window
- if (recoveredData.left == null || recoveredData.right == null) {
- return;
- }
+ MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer,
Integer>)windowManager.load(operatorId,
+ windowId);
- if (recoveredData.right.equals(rangeQueryPair.getValue()) ||
recoveredData.right.equals(previousUpperBound)) {
- LOG.info("Matched so returning");
- return;
- }
-
- JdbcPollInputOperator jdbcPoller = new JdbcPollInputOperator();
- jdbcPoller.setStore(store);
- jdbcPoller.setKey(getKey());
- jdbcPoller.setPartitionCount(getPartitionCount());
- jdbcPoller.setPollInterval(getPollInterval());
- jdbcPoller.setTableName(getTableName());
- jdbcPoller.setBatchSize(getBatchSize());
- isPollable = false;
+ if (recoveredData != null && shouldReplayWindow(recoveredData)) {
+ LOG.debug("[Recovering Window ID - {} for record range: {}, {}]",
windowId, recoveredData.left,
+ recoveredData.right);
- LOG.debug("[Window ID -" + windowId + "," + recoveredData.left +
"," + recoveredData.right + "]");
+ ps = store.getConnection().prepareStatement(
+ buildRangeQuery(recoveredData.left, (recoveredData.right -
recoveredData.left)), TYPE_FORWARD_ONLY,
+ CONCUR_READ_ONLY);
+ LOG.info("Query formed to recover data - {}", ps.toString());
- jdbcPoller.setRangeQueryPair(new KeyValPair<String,
String>(recoveredData.left, recoveredData.right));
+ emitReplayedTuples(ps);
- jdbcPoller.ps = jdbcPoller.store.getConnection().prepareStatement(
- JdbcMetaDataUtility.buildRangeQuery(jdbcPoller.getTableName(),
jdbcPoller.getKey(),
- jdbcPoller.getRangeQueryPair().getKey(),
jdbcPoller.getRangeQueryPair().getValue()),
- java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
- LOG.info("Query formed for recovered data - {}",
jdbcPoller.ps.toString());
+ }
- emitReplayedTuples(jdbcPoller.ps);
+ if (currentWindowId == windowManager.getLargestRecoveryWindow()) {
+ try {
+ if (!isPollerPartition && rangeQueryPair.getValue() != null) {
+ ps = store.getConnection().prepareStatement(
+ buildRangeQuery(lastEmittedRecord,
(rangeQueryPair.getValue() - lastEmittedRecord)), TYPE_FORWARD_ONLY,
+ CONCUR_READ_ONLY);
+ } else {
+ Integer bound = null;
+ if (lastEmittedRecord == null) {
+ bound = rangeQueryPair.getKey();
+ } else {
+ bound = lastEmittedRecord;
+ }
+ ps =
store.getConnection().prepareStatement(buildRangeQuery(bound,
Integer.MAX_VALUE), TYPE_FORWARD_ONLY,
+ CONCUR_READ_ONLY);
+ }
+ scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval,
TimeUnit.MILLISECONDS);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
} catch (IOException e) {
throw new RuntimeException("replay", e);
--- End diff --
Better description for the exception?
> Update JDBC poll input operator to fix issues
> ---------------------------------------------
>
> Key: APEXMALHAR-2172
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2172
> Project: Apache Apex Malhar
> Issue Type: Improvement
> Reporter: Priyanka Gugale
> Assignee: Priyanka Gugale
>
> Update JDBCPollInputOperator to:
> 1. Fix small bugs
> 2. Use jooq query dsl library to construct sql queries
> 3. Make code more readable
> 4. Use row counts rather than key column values to partition reads
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)