[
https://issues.apache.org/jira/browse/APEXMALHAR-2100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405743#comment-15405743
]
ASF GitHub Bot commented on APEXMALHAR-2100:
--------------------------------------------
Github user chinmaykolhatkar commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/330#discussion_r73320461
--- Diff:
library/src/main/java/com/datatorrent/lib/join/POJOInnerJoinOperator.java ---
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.join;
+
+import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Concrete implementation of AbstractManagedStateInnerJoinOperator and
receives objects from both streams.
+ *
+ * @displayName POJO Inner Join Operator
+ * @tags join
+ */
+public class POJOInnerJoinOperator extends
AbstractManagedStateInnerJoinOperator<Object,Object> implements
Operator.ActivationListener<Context>
+{
+ private static final transient Logger LOG =
LoggerFactory.getLogger(POJOInnerJoinOperator.class);
+ private transient long timeIncrement;
+ private transient FieldObjectMap[] inputFieldObjects =
(FieldObjectMap[])Array.newInstance(FieldObjectMap.class, 2);
+ protected Class<?> outputClass;
+ private long time = System.currentTimeMillis();
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public final transient DefaultOutputPort<Object> outputPort = new
DefaultOutputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> input1 = new
DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ inputFieldObjects[0].inputClass =
context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple,true);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return getInnerJoinStreamCodec(true);
+ }
+ };
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> input2 = new
DefaultInputPort<Object>()
+ {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ inputFieldObjects[1].inputClass =
context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple,false);
+ }
+
+ @Override
+ public StreamCodec<Object> getStreamCodec()
+ {
+ return getInnerJoinStreamCodec(false);
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ timeIncrement =
context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+ super.setup(context);
+ for (int i = 0; i < 2; i++) {
+ inputFieldObjects[i] = new FieldObjectMap();
+ }
+ }
+
+ /**
+ * Extract the time value from the given tuple
+ * @param tuple given tuple
+ * @param isStream1Data Specifies whether the given tuple belongs to
stream1 or not.
+ * @return
+ */
+ @Override
+ public long extractTime(Object tuple, boolean isStream1Data)
+ {
+ return timeFields == null ? time : (long)(isStream1Data ?
inputFieldObjects[0].timeFieldGet.get(tuple) :
+ inputFieldObjects[1].timeFieldGet.get(tuple));
+ }
+
+ /**
+ * Create getters for the key and time fields and setters for the
include fields.
+ */
+ protected void generateSettersAndGetters()
+ {
+ for (int i = 0; i < 2; i++) {
+ Class inputClass = inputFieldObjects[i].inputClass;
+ try {
+ Class c =
ClassUtils.primitiveToWrapper(inputClass.getField(keyFields.get(i)).getType());
+ inputFieldObjects[i].keyGet = PojoUtils.createGetter(inputClass,
keyFields.get(i), c);
+ if (timeFields != null && timeFields.size() != 0) {
+ Class tc =
ClassUtils.primitiveToWrapper(inputClass.getField(timeFields.get(i)).getType());
+ inputFieldObjects[i].timeFieldGet =
PojoUtils.createGetter(inputClass, timeFields.get(i), tc);
+ }
+ for (int j = 0; j < includeFields[i].length; j++) {
+ Class ic =
ClassUtils.primitiveToWrapper(inputClass.getField(includeFields[i][j]).getType());
+ Class oc =
ClassUtils.primitiveToWrapper(outputClass.getField(includeFields[i][j]).getType());
+
inputFieldObjects[i].fieldMap.put(PojoUtils.createGetter(inputClass,
includeFields[i][j], ic),
--- End diff --
I'm not sure if that's the best behaviour... Because its really uncertain
that what stream should overwrite on what. Any other approach would be possible
for this?
> Development of Inner Join Operator using Spillable Datastructures
> -----------------------------------------------------------------
>
> Key: APEXMALHAR-2100
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2100
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Chaitanya
> Assignee: Chaitanya
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)