Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 40b0c42e4 -> 958471a4d
MLHR-1895 #resolve #comment refactor AbstractAppDataSnapshotServer to support override or provide QueryExecutor Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d9957541 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d9957541 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d9957541 Branch: refs/heads/devel-3 Commit: d9957541a6f0521b4d16a011f02f35a863c9a10b Parents: 02f48e1 Author: bright <[email protected]> Authored: Wed Nov 11 13:44:36 2015 -0800 Committer: bright <[email protected]> Committed: Thu Nov 12 15:05:12 2015 -0800 ---------------------------------------------------------------------- .../snapshot/AbstractAppDataSnapshotServer.java | 132 +++++++++++++------ 1 file changed, 89 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d9957541/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java index a309746..ded099b 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java @@ -19,20 +19,26 @@ package com.datatorrent.lib.appdata.snapshot; import java.io.IOException; - import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import javax.validation.constraints.NotNull; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.lang3.mutable.MutableLong; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.experimental.AppData; +import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider; import com.datatorrent.lib.appdata.StoreUtils; import com.datatorrent.lib.appdata.gpo.GPOMutable; import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager; @@ -40,19 +46,17 @@ import com.datatorrent.lib.appdata.query.QueryExecutor; import com.datatorrent.lib.appdata.query.QueryManagerSynchronous; import com.datatorrent.lib.appdata.query.serde.MessageDeserializerFactory; import com.datatorrent.lib.appdata.query.serde.MessageSerializerFactory; -import com.datatorrent.lib.appdata.schemas.*; +import com.datatorrent.lib.appdata.schemas.DataQuerySnapshot; +import com.datatorrent.lib.appdata.schemas.DataResultSnapshot; import com.datatorrent.lib.appdata.schemas.Message; import com.datatorrent.lib.appdata.schemas.Query; import com.datatorrent.lib.appdata.schemas.Result; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; - -import com.datatorrent.common.experimental.AppData; -import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider; +import com.datatorrent.lib.appdata.schemas.ResultFormatter; +import com.datatorrent.lib.appdata.schemas.SchemaQuery; +import com.datatorrent.lib.appdata.schemas.SchemaRegistry; +import com.datatorrent.lib.appdata.schemas.SchemaRegistrySingle; +import com.datatorrent.lib.appdata.schemas.SchemaResult; +import com.datatorrent.lib.appdata.schemas.SnapshotSchema; /** * This is an abstract operator for the {@link SnapshotSchema}. This operator is designed to accept input data @@ -67,7 +71,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper /** * The {@link QueryManagerSynchronous} for the operator. */ - private transient QueryManagerSynchronous<Query, Void, MutableLong, Result> queryProcessor; + protected transient QueryManagerSynchronous<Query, Void, MutableLong, Result> queryProcessor; /** * The {@link MessageDeserializerFactory} for the operator. */ @@ -91,13 +95,18 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper /** * The current data to be served by the operator. */ - private List<GPOMutable> currentData = Lists.newArrayList(); + protected List<GPOMutable> currentData = Lists.newArrayList(); private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider; private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>(); @AppData.ResultPort public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>(); + /** + * The queryExecutor execute the query and return the result. + */ + protected QueryExecutor<Query, Void, MutableLong, Result> queryExecutor; + @AppData.QueryPort @InputPortFieldAnnotation(optional=true) public transient final DefaultInputPort<String> query = new DefaultInputPort<String>() @@ -105,44 +114,59 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper @Override public void process(String queryJSON) { - LOG.debug("query {}", queryJSON); - Message query = null; - - try { - query = queryDeserializerFactory.deserialize(queryJSON); - } catch (IOException ex) { - LOG.error("Error parsing query: {}", queryJSON); - LOG.error("{}", ex); - return; - } + processQuery(queryJSON); + } + }; + + /** + * process the query send. + * provide this method to give sub class a chance to override. + * @param queryJSON + */ + protected void processQuery(String queryJSON) + { + LOG.debug("query {}", queryJSON); + Message query = null; + + try { + query = queryDeserializerFactory.deserialize(queryJSON); + } catch (IOException ex) { + LOG.error("Error parsing query: {}", queryJSON); + LOG.error("{}", ex); + return; + } - if (query instanceof SchemaQuery) { - SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query); + if (query instanceof SchemaQuery) { + SchemaResult schemaResult = schemaRegistry.getSchemaResult((SchemaQuery)query); - if (schemaResult != null) { - LOG.debug("queueing {}", schemaResult); - schemaQueue.add(schemaResult); - } - } else if (query instanceof DataQuerySnapshot) { - queryProcessor.enqueue((DataQuerySnapshot)query, null, null); + if (schemaResult != null) { + LOG.debug("queueing {}", schemaResult); + schemaQueue.add(schemaResult); } + } else if (query instanceof DataQuerySnapshot) { + queryProcessor.enqueue((DataQuerySnapshot)query, null, null); } - }; + } public transient final DefaultInputPort<List<INPUT_EVENT>> input = new DefaultInputPort<List<INPUT_EVENT>>() { @Override public void process(List<INPUT_EVENT> rows) { - currentData.clear(); - - for(INPUT_EVENT inputEvent: rows) { - GPOMutable gpoRow = convert(inputEvent); - currentData.add(gpoRow); - } + processData(rows); } }; + protected void processData(List<INPUT_EVENT> rows) + { + currentData.clear(); + + for (INPUT_EVENT inputEvent : rows) { + GPOMutable gpoRow = convert(inputEvent); + currentData.add(gpoRow); + } + } + /** * Create operator. */ @@ -174,8 +198,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper schema = new SnapshotSchema(snapshotSchemaJSON); schemaRegistry = new SchemaRegistrySingle(schema); //Setup for query processing - queryProcessor = QueryManagerSynchronous.newInstance(new SnapshotComputer(), new AppDataWindowEndQueueManager<Query, Void>()); - + setupQueryProcessor(); + queryDeserializerFactory = new MessageDeserializerFactory(SchemaQuery.class, DataQuerySnapshot.class); queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry); @@ -190,6 +214,12 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper embeddableQueryInfoProvider.setup(context); } } + + protected void setupQueryProcessor() + { + queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor, + new AppDataWindowEndQueueManager<Query, Void>()); + } @Override public void beginWindow(long windowId) @@ -312,4 +342,20 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper } private static final Logger LOG = LoggerFactory.getLogger(AbstractAppDataSnapshotServer.class); + + public QueryExecutor<Query, Void, MutableLong, Result> getQueryExecutor() + { + return queryExecutor; + } + + public void setQueryExecutor(QueryExecutor<Query, Void, MutableLong, Result> queryExecutor) + { + this.queryExecutor = queryExecutor; + } + + public List<GPOMutable> getCurrentData() + { + return currentData; + } + }
