This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new 783b7fe  APEXMALHAR-2530 Refactored AbstractAppDataSnapshotServer so 
that subclasses don't need schemas
783b7fe is described below

commit 783b7fe3a1d9dd9c3ef7803e049d7d317b2aefd1
Author: David Yan <david...@apache.org>
AuthorDate: Sat Jul 29 23:33:54 2017 -0700

    APEXMALHAR-2530 Refactored AbstractAppDataSnapshotServer so that subclasses 
don't need schemas
---
 .../snapshot/AbstractAppDataSnapshotServer.java    |  35 +-----
 .../lib/io/PubSubWebSocketAppDataQuery.java        |  14 ---
 .../lib/io/PubSubWebSocketAppDataResult.java       |  16 +--
 .../malhar/lib/appdata/AbstractAppDataServer.java  | 125 +++++++++++++++++++++
 4 files changed, 139 insertions(+), 51 deletions(-)

diff --git 
a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
 
b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
index 19e142b..88d89c4 100644
--- 
a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
+++ 
b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java
@@ -28,6 +28,7 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.appdata.AbstractAppDataServer;
 import org.apache.commons.lang3.mutable.MutableLong;
 
 import com.google.common.base.Preconditions;
@@ -36,11 +37,9 @@ import com.google.common.collect.Lists;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.common.experimental.AppData;
 import com.datatorrent.common.experimental.AppData.EmbeddableQueryInfoProvider;
-import com.datatorrent.lib.appdata.StoreUtils;
 import com.datatorrent.lib.appdata.gpo.GPOMutable;
 import com.datatorrent.lib.appdata.query.AppDataWindowEndQueueManager;
 import com.datatorrent.lib.appdata.query.QueryExecutor;
@@ -67,7 +66,7 @@ import com.datatorrent.lib.appdata.schemas.SnapshotSchema;
  * @param <INPUT_EVENT> The type of the input events that the operator accepts.
  * @since 3.0.0
  */
-public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements 
Operator, AppData.Store<String>
+public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> extends 
AbstractAppDataServer<String>
 {
   /**
    * The {@link QueryManagerSynchronous} for the operator.
@@ -198,6 +197,7 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   @Override
   public void setup(OperatorContext context)
   {
+    super.setup(context);
     setupSchema();
 
     schemaRegistry = new SchemaRegistrySingle(schema);
@@ -209,13 +209,6 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
     queryDeserializerFactory.setContext(DataQuerySnapshot.class, 
schemaRegistry);
     resultSerializerFactory = new MessageSerializerFactory(resultFormatter);
     queryProcessor.setup(context);
-
-    if (embeddableQueryInfoProvider != null) {
-      embeddableQueryInfoProvider.enableEmbeddedMode();
-      LOG.info("An embeddable query operator is being used of class {}.", 
embeddableQueryInfoProvider.getClass().getName());
-      
StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(),
 query);
-      embeddableQueryInfoProvider.setup(context);
-    }
   }
 
   protected void setupSchema()
@@ -235,19 +228,14 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   @Override
   public void beginWindow(long windowId)
   {
-    if (embeddableQueryInfoProvider != null) {
-      embeddableQueryInfoProvider.beginWindow(windowId);
-    }
-
+    super.beginWindow(windowId);
     queryProcessor.beginWindow(windowId);
   }
 
   @Override
   public void endWindow()
   {
-    if (embeddableQueryInfoProvider != null) {
-      embeddableQueryInfoProvider.endWindow();
-    }
+    super.endWindow();
 
     {
       Result result;
@@ -275,21 +263,10 @@ public abstract class 
AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper
   @Override
   public void teardown()
   {
-    if (embeddableQueryInfoProvider != null) {
-      embeddableQueryInfoProvider.teardown();
-    }
-
+    super.teardown();
     queryProcessor.teardown();
   }
 
-  @Override
-  public void deactivate()
-  {
-    if (embeddableQueryInfoProvider != null) {
-      embeddableQueryInfoProvider.deactivate();
-    }
-  }
-
   /**
    * Gets the JSON for the schema.
    * @return the JSON for the schema.
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
index 7cf883f..3f2029e 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.java
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
 
 import javax.validation.constraints.Min;
 
-import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
@@ -158,19 +157,6 @@ public class PubSubWebSocketAppDataQuery extends 
PubSubWebSocketInputOperator<St
 
     try {
       JSONObject jo = new JSONObject(message);
-      JSONArray ja = jo.names();
-
-      //Make sure that only the correct keys are in the first level of JSON
-      for (int keyIndex = 0; keyIndex < ja.length(); keyIndex++) {
-        String key = ja.getString(keyIndex);
-        if (!(PubSubMessage.DATA_KEY.equals(key) ||
-            PubSubMessage.TOPIC_KEY.equals(key) ||
-            PubSubMessage.TYPE_KEY.equals(key))) {
-          logger.error("{} is not a valid key in the first level of the 
following pubsub message:\n{}", key, message);
-          return null;
-        }
-      }
-
       data = jo.getString(PubSubMessage.DATA_KEY);
     } catch (JSONException e) {
       return null;
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
index e1f3fa1..7800fa4 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/PubSubWebSocketAppDataResult.java
@@ -97,16 +97,16 @@ public class PubSubWebSocketAppDataResult extends 
PubSubWebSocketOutputOperator<
       throw new RuntimeException(ex);
     }
 
-    String id;
-
-    try {
-      id = jo.getString("id");
-    } catch (JSONException ex) {
-      throw new RuntimeException(ex);
+    String topic = getTopic();
+
+    if (jo.has("id")) {
+      try {
+        topic += "." + jo.getString("id");
+      } catch (JSONException ex) {
+        throw new RuntimeException(ex);
+      }
     }
 
-    String topic = getTopic() + "." + id;
-
     JSONObject output = new JSONObject();
 
     try {
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java
new file mode 100644
index 0000000..4b57d66
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/appdata/AbstractAppDataServer.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.appdata;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.experimental.AppData;
+import com.datatorrent.lib.appdata.StoreUtils;
+
+/**
+ * This is an operator that lays the framework of serving data to queries 
coming from an embeddable query info provider,
+ * which may be an input operator.
+ * Subclasses are expected to implement the processQuery method for the logic 
of handling the query. Note that
+ * processQuery cannot directly emit to the operator's output port because 
it's called from the thread of the
+ * embeddable query info provider.
+ */
+public abstract class AbstractAppDataServer<QueryType> implements Operator, 
AppData.Store<QueryType>
+{
+  @AppData.QueryPort
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<QueryType> query = new 
DefaultInputPort<QueryType>()
+  {
+    @Override
+    public void process(QueryType query)
+    {
+      processQuery(query);
+    }
+  };
+
+
+  protected AppData.EmbeddableQueryInfoProvider<QueryType> 
embeddableQueryInfoProvider;
+
+
+  protected abstract void processQuery(QueryType query);
+
+  @Override
+  public void activate(Context.OperatorContext ctx)
+  {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.activate(ctx);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.enableEmbeddedMode();
+      LOG.info("An embeddable query operator is being used of class {}.", 
embeddableQueryInfoProvider.getClass().getName());
+      
StoreUtils.attachOutputPortToInputPort(embeddableQueryInfoProvider.getOutputPort(),
 query);
+      embeddableQueryInfoProvider.setup(context);
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.beginWindow(windowId);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.endWindow();
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.teardown();
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+    if (embeddableQueryInfoProvider != null) {
+      embeddableQueryInfoProvider.deactivate();
+    }
+  }
+
+  @Override
+  public AppData.EmbeddableQueryInfoProvider<QueryType> 
getEmbeddableQueryInfoProvider()
+  {
+    return embeddableQueryInfoProvider;
+  }
+
+  @Override
+  public void 
setEmbeddableQueryInfoProvider(AppData.EmbeddableQueryInfoProvider<QueryType> 
embeddableQueryInfoProvider)
+  {
+    this.embeddableQueryInfoProvider = 
Preconditions.checkNotNull(embeddableQueryInfoProvider);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAppDataServer.class);
+
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <commits@apex.apache.org>'].

Reply via email to