This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push: new 783b7fe APEXMALHAR-2530 Refactored AbstractAppDataSnapshotServer so that subclasses don't need schemas 783b7fe is described below commit 783b7fe3a1d9dd9c3ef7803e049d7d317b2aefd1 Author: David Yan <david...@apache.org> AuthorDate: Sat Jul 29 23:33:54 2017 -0700 APEXMALHAR-2530 Refactored AbstractAppDataSnapshotServer so that subclasses don't need schemas --- .../snapshot/AbstractAppDataSnapshotServer.java | 35 +----- .../lib/io/PubSubWebSocketAppDataQuery.java | 14 --- .../lib/io/PubSubWebSocketAppDataResult.java | 16 +-- .../malhar/lib/appdata/AbstractAppDataServer.java | 125 +++++++++++++++++++++ 4 files changed, 139 insertions(+), 51 deletions(-) diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java index 19e142b..88d89c4 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java @@ -28,6 +28,7 @@ import javax.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.appdata.AbstractAppDataServer; import org.apache.commons.lang3.mutable.MutableLong; import com.google.common.base.Preconditions; @@ -36,11 +37,9 @@ import com.google.common.collect.Lists; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.common.experimental.AppData; import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider; -import com.datatorrent.lib.appdata.StoreUtils; import com.datatorrent.lib.appdata.gpo.GPOMutable; import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager; import com.datatorrent.lib.appdata.query.QueryExecutor; @@ -67,7 +66,7 @@ import com.datatorrent.lib.appdata.schemas.SnapshotSchema; * @param <INPUT_EVENT> The type of the input events that the operator accepts. * @since 3.0.0 */ -public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Operator, AppData.Store<String> +public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> extends AbstractAppDataServer<String> { /** * The {@link QueryManagerSynchronous} for the operator. @@ -198,6 +197,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper @Override public void setup(OperatorContext context) { + super.setup(context); setupSchema(); schemaRegistry = new SchemaRegistrySingle(schema); @@ -209,13 +209,6 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry); resultSerializerFactory = new MessageSerializerFactory(resultFormatter); queryProcessor.setup(context); - - if (embeddableQueryInfoProvider != null) { - embeddableQueryInfoProvider.enableEmbeddedMode(); - LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName()); - StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(), query); - embeddableQueryInfoProvider.setup(context); - } } protected void setupSchema() @@ -235,19 +228,14 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper @Override public void beginWindow(long windowId) { - if (embeddableQueryInfoProvider != null) { - embeddableQueryInfoProvider.beginWindow(windowId); - } - + super.beginWindow(windowId); queryProcessor.beginWindow(windowId); } @Override public void endWindow() { - if (embeddableQueryInfoProvider != null) { - embeddableQueryInfoProvider.endWindow(); - } + super.endWindow(); { Result result; @@ -275,21 +263,10 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper @Override public void teardown() { - if (embeddableQueryInfoProvider != null) { - embeddableQueryInfoProvider.teardown(); - } - + super.teardown(); queryProcessor.teardown(); } - @Override - public void deactivate() - { - if (embeddableQueryInfoProvider != null) { - embeddableQueryInfoProvider.deactivate(); - } - } - /** * Gets the JSON for the schema. * @return the JSON for the schema. diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java index 7cf883f..3f2029e 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java @@ -23,7 +23,6 @@ import java.net.URISyntaxException; import javax.validation.constraints.Min; -import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; @@ -158,19 +157,6 @@ public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<St try { JSONObject jo = new JSONObject(message); - JSONArray ja = jo.names(); - - //Make sure that only the correct keys are in the first level of JSON - for (int keyIndex = 0; keyIndex < ja.length(); keyIndex++) { - String key = ja.getString(keyIndex); - if (!(PubSubMessage.DATA_KEY.equals(key) || - PubSubMessage.TOPIC_KEY.equals(key) || - PubSubMessage.TYPE_KEY.equals(key))) { - logger.error("{} is not a valid key in the first level of the following pubsub message:\n{}", key, message); - return null; - } - } - data = jo.getString(PubSubMessage.DATA_KEY); } catch (JSONException e) { return null; diff --git a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java index e1f3fa1..7800fa4 100644 --- a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java +++ b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java @@ -97,16 +97,16 @@ public class PubSubWebSocketAppDataResult extends PubSubWebSocketOutputOperator< throw new RuntimeException(ex); } - String id; - - try { - id = jo.getString("id"); - } catch (JSONException ex) { - throw new RuntimeException(ex); + String topic = getTopic(); + + if (jo.has("id")) { + try { + topic += "." + jo.getString("id"); + } catch (JSONException ex) { + throw new RuntimeException(ex); + } } - String topic = getTopic() + "." + id; - JSONObject output = new JSONObject(); try { diff --git a/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java b/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java new file mode 100644 index 0000000..4b57d66 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.appdata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.experimental.AppData; +import com.datatorrent.lib.appdata.StoreUtils; + +/** + * This is an operator that lays the framework of serving data to queries coming from an embeddable query info provider, + * which may be an input operator. + * Subclasses are expected to implement the processQuery method for the logic of handling the query. Note that + * processQuery cannot directly emit to the operator's output port because it's called from the thread of the + * embeddable query info provider. + */ +public abstract class AbstractAppDataServer<QueryType> implements Operator, AppData.Store<QueryType> +{ + @AppData.QueryPort + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<QueryType> query = new DefaultInputPort<QueryType>() + { + @Override + public void process(QueryType query) + { + processQuery(query); + } + }; + + + protected AppData.EmbeddableQueryInfoProvider<QueryType> embeddableQueryInfoProvider; + + + protected abstract void processQuery(QueryType query); + + @Override + public void activate(Context.OperatorContext ctx) + { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.activate(ctx); + } + } + + @SuppressWarnings("unchecked") + @Override + public void setup(Context.OperatorContext context) + { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.enableEmbeddedMode(); + LOG.info("An embeddable query operator is being used of class {}.", embeddableQueryInfoProvider.getClass().getName()); + StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(), query); + embeddableQueryInfoProvider.setup(context); + } + } + + @Override + public void beginWindow(long windowId) + { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.beginWindow(windowId); + } + } + + @Override + public void endWindow() + { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.endWindow(); + } + } + + @Override + public void teardown() + { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.teardown(); + } + } + + @Override + public void deactivate() + { + if (embeddableQueryInfoProvider != null) { + embeddableQueryInfoProvider.deactivate(); + } + } + + @Override + public AppData.EmbeddableQueryInfoProvider<QueryType> getEmbeddableQueryInfoProvider() + { + return embeddableQueryInfoProvider; + } + + @Override + public void setEmbeddableQueryInfoProvider(AppData.EmbeddableQueryInfoProvider<QueryType> embeddableQueryInfoProvider) + { + this.embeddableQueryInfoProvider = Preconditions.checkNotNull(embeddableQueryInfoProvider); + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractAppDataServer.class); + +} -- To stop receiving notification emails like this one, please contact ['"commits@apex.apache.org" <commits@apex.apache.org>'].