Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 658118a6f -> e0338537c
MLHR-1906 #resolve #comment Snapshot Server support tags Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3738eccb Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3738eccb Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3738eccb Branch: refs/heads/devel-3 Commit: 3738eccbad5a7734ca98496c632a65dea40112e5 Parents: 7803359 Author: bright <[email protected]> Authored: Mon Nov 16 16:16:18 2015 -0800 Committer: bright <[email protected]> Committed: Tue Nov 17 11:37:23 2015 -0800 ---------------------------------------------------------------------- .../datatorrent/lib/appdata/schemas/Schema.java | 3 +- .../lib/appdata/schemas/SnapshotSchema.java | 24 +++- .../snapshot/AbstractAppDataSnapshotServer.java | 38 +++++-- .../AppDataSnapshotServerTagsSupportTest.java | 111 +++++++++++++++++++ .../satisfactionRatingSnapshotSchema_test.json | 8 ++ ...actionRatingWithTagsSnapshotSchema_test.json | 9 ++ 6 files changed, 178 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java index f4dd1a2..e2e21da 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java @@ -33,7 +33,8 @@ public interface Schema public static final String FIELD_SCHEMA_KEYS = "schemaKeys"; public static final String FIELD_SCHEMA = "schema"; - + public static final String FIELD_SCHEMA_TAGS = "tags"; + /** * The id of the schema. This is relevant for operators which support serving multiple schemas, * in which each schema will need a unique id. http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java index c2c6eb2..aed9013 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java @@ -20,6 +20,7 @@ package com.datatorrent.lib.appdata.schemas; import java.util.Collections; import java.util.Map; +import java.util.Set; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -198,12 +199,6 @@ public class SnapshotSchema implements Schema { schema = new JSONObject(schemaJSON); - Preconditions.checkState(schema.length() == NUM_KEYS_FIRST_LEVEL, - "Expected " - + NUM_KEYS_FIRST_LEVEL - + " keys in the first level but found " - + schema.length()); - if(schemaKeys != null) { schema.put(Schema.FIELD_SCHEMA_KEYS, SchemaUtils.createJSONObject(schemaKeys)); @@ -246,6 +241,23 @@ public class SnapshotSchema implements Schema schemaJSON = this.schema.toString(); } + public void setTags(Set<String> tags) + { + if (tags == null || tags.isEmpty()) + throw new IllegalArgumentException("tags can't be null or empty."); + + try { + JSONArray tagArray = new JSONArray(tags); + + schema.put(FIELD_SCHEMA_TAGS, tagArray); + } catch (JSONException e) { + Preconditions.checkState(false, e.getMessage()); + throw new RuntimeException(e); + } + + schemaJSON = schema.toString(); + } + /** * This is a helper method which sets the JSON that represents this schema. * @param schemaJSON The JSON that represents this schema. http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java index ded099b..236735f 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java @@ -20,6 +20,7 @@ package com.datatorrent.lib.appdata.snapshot; import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import javax.validation.constraints.NotNull; @@ -79,25 +80,25 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper /** * The {@link MessageSerializerFactory} for the operator. */ - private transient MessageSerializerFactory resultSerializerFactory; + protected transient MessageSerializerFactory resultSerializerFactory; /** * The {@link SchemaRegistry} for the operator. */ - private transient SchemaRegistry schemaRegistry; + protected transient SchemaRegistry schemaRegistry; /** * The schema for the operator. */ protected transient SnapshotSchema schema; @NotNull - private ResultFormatter resultFormatter = new ResultFormatter(); - private String snapshotSchemaJSON; + protected ResultFormatter resultFormatter = new ResultFormatter(); + protected String snapshotSchemaJSON; /** * The current data to be served by the operator. */ protected List<GPOMutable> currentData = Lists.newArrayList(); - private EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider; - private final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>(); + protected EmbeddableQueryInfoProvider<String> embeddableQueryInfoProvider; + protected final transient ConcurrentLinkedQueue<SchemaResult> schemaQueue = new ConcurrentLinkedQueue<>(); @AppData.ResultPort public final transient DefaultOutputPort<String> queryResult = new DefaultOutputPort<>(); @@ -106,6 +107,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper * The queryExecutor execute the query and return the result. */ protected QueryExecutor<Query, Void, MutableLong, Result> queryExecutor; + + private Set<String> tags; @AppData.QueryPort @InputPortFieldAnnotation(optional=true) @@ -195,7 +198,8 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper @Override public void setup(OperatorContext context) { - schema = new SnapshotSchema(snapshotSchemaJSON); + setupSchema(); + schemaRegistry = new SchemaRegistrySingle(schema); //Setup for query processing setupQueryProcessor(); @@ -214,7 +218,14 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper embeddableQueryInfoProvider.setup(context); } } - + + protected void setupSchema() + { + schema = new SnapshotSchema(snapshotSchemaJSON); + if (tags != null && !tags.isEmpty()) + schema.setTags(tags); + } + protected void setupQueryProcessor() { queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor, @@ -357,5 +368,16 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper { return currentData; } + + public Set<String> getTags() + { + return tags; + } + + public void setTags(Set<String> tags) + { + this.tags = tags; + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java new file mode 100644 index 0000000..5ae882c --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerTagsSupportTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.appdata.snapshot; + +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Sets; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.schemas.Schema; +import com.datatorrent.lib.appdata.schemas.SchemaResult; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; + +public class AppDataSnapshotServerTagsSupportTest +{ + public static class AppDataSnapshotServerSchemaExport extends AbstractAppDataSnapshotServer<Object> + { + SchemaResult schemaResult; + String schemaResultJSON; + + @Override + public GPOMutable convert(Object inputEvent) + { + return null; + } + + @Override + public void endWindow() + { + while ((schemaResult = schemaQueue.poll()) != null) { + schemaResultJSON = resultSerializerFactory.serialize(schemaResult); + queryResult.emit(schemaResultJSON); + } + + queryProcessor.endWindow(); + } + } + + private static final String schemaLocation = "satisfactionRatingSnapshotSchema_test.json"; + private static final String TAG = "bulletin"; + + private static final String schemaWithTagsLocation = "satisfactionRatingWithTagsSnapshotSchema_test.json"; + + @Test + public void testSchema() throws Exception + { + AppDataSnapshotServerSchemaExport[] snapshotServers = getSnapshotServers(); + + for (AppDataSnapshotServerSchemaExport snapshotServer : snapshotServers) { + snapshotServer.setup(null); + + snapshotServer.processQuery("{\"id\":123, \"type\":\"schemaQuery\"}"); + snapshotServer.beginWindow(0L); + snapshotServer.endWindow(); + + String result = snapshotServer.schemaResultJSON; + + JSONObject json = new JSONObject(result); + JSONObject jsonData = (JSONObject)json.getJSONArray("data").get(0); + JSONArray tags = jsonData.getJSONArray(Schema.FIELD_SCHEMA_TAGS); + Assert.assertTrue("No tags", tags != null); + Assert.assertEquals("Invalid tag.", tags.get(0), TAG); + } + } + + protected AppDataSnapshotServerSchemaExport[] getSnapshotServers() + { + AppDataSnapshotServerSchemaExport[] snapshotServers = new AppDataSnapshotServerSchemaExport[2]; + + { + String schema = SchemaUtils.jarResourceFileToString(schemaLocation); + + AppDataSnapshotServerSchemaExport snapshotServer = new AppDataSnapshotServerSchemaExport(); + + snapshotServer.setSnapshotSchemaJSON(schema); + snapshotServer.setTags(Sets.newHashSet(TAG)); + + snapshotServers[0] = snapshotServer; + } + { + String schema = SchemaUtils.jarResourceFileToString(schemaWithTagsLocation); + + AppDataSnapshotServerSchemaExport snapshotServer = new AppDataSnapshotServerSchemaExport(); + + snapshotServer.setSnapshotSchemaJSON(schema); + + snapshotServers[1] = snapshotServer; + } + + return snapshotServers; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json ---------------------------------------------------------------------- diff --git a/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json b/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json new file mode 100644 index 0000000..5733e6c --- /dev/null +++ b/library/src/test/resources/satisfactionRatingSnapshotSchema_test.json @@ -0,0 +1,8 @@ +{ + "values": [ + {"name": "current", "type": "long", "tags": ["current"]}, + {"name": "min", "type": "long", "tags": ["min"]}, + {"name": "max", "type": "long", "tags": ["max"]}, + {"name": "threshold", "type": "long", "tags": ["threshold"]} + ] +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3738eccb/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json ---------------------------------------------------------------------- diff --git a/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json b/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json new file mode 100644 index 0000000..7c04db5 --- /dev/null +++ b/library/src/test/resources/satisfactionRatingWithTagsSnapshotSchema_test.json @@ -0,0 +1,9 @@ +{ + "tags": ["bulletin"], + "values": [ + {"name": "current", "type": "long", "tags": ["current"]}, + {"name": "min", "type": "long", "tags": ["min"]}, + {"name": "max", "type": "long", "tags": ["max"]}, + {"name": "threshold", "type": "long", "tags": ["threshold"]} + ] +}
