Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 16b198109 -> f1fee8f9e


MLHR-1940 #comment change the util method name


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1abdb743
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1abdb743
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1abdb743

Branch: refs/heads/devel-3
Commit: 1abdb743496cbef7a6b7af7f0e7653076c1b4161
Parents: 365237d
Author: Timothy Farkas <[email protected]>
Authored: Sat Dec 12 19:51:31 2015 -0800
Committer: Chandni Singh <[email protected]>
Committed: Sun Dec 13 00:22:04 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/lib/util/time/WindowUtils.java  | 100 ++++++++++++++++
 .../lib/util/time/WindowUtilsTest.java          | 113 +++++++++++++++++++
 2 files changed, 213 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1abdb743/library/src/main/java/com/datatorrent/lib/util/time/WindowUtils.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/util/time/WindowUtils.java 
b/library/src/main/java/com/datatorrent/lib/util/time/WindowUtils.java
new file mode 100644
index 0000000..16a4dfa
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/util/time/WindowUtils.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util.time;
+
+import java.math.BigDecimal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+
+/**
+ * This a is class which holds utility functions that can be used by operators 
to perform common
+ * tasks.
+ */
+public class WindowUtils
+{
+  /**
+   * Computes the duration of an Operator's application window in milliseconds.
+   * @param context The context of the Operator whose application window 
duration is
+   * being computed.
+   * @return The duration of an Operator's application window in milliseconds.
+   */
+  public static long getAppWindowDurationMs(OperatorContext context)
+  {
+    return 
context.getValue(DAGContext.STREAMING_WINDOW_SIZE_MILLIS).longValue() *
+           
context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT).longValue();
+  }
+
+  /**
+   * Computes the number of application windows that will fit into the given
+   * millisecond duration. This method rounds the number of application 
windows up.
+   * @param context The context of the Operator for which the number of 
application
+   * windows is being computed.
+   * @param millis The millisecond duration to compute the number of 
application windows for.
+   * @return The number of application windows that will fit into the given 
millisecond duration.
+   */
+  public static long msToAppWindowCount(OperatorContext context, long millis)
+  {
+    Preconditions.checkArgument(millis > 0);
+    long appWindowDurationMS = getAppWindowDurationMs(context);
+    long appWindowCount = millis / appWindowDurationMS;
+
+    if (millis % appWindowDurationMS != 0) {
+      appWindowCount++;
+    }
+
+    return appWindowCount;
+  }
+
+  /**
+   * Converts tuples per second into tuples per application window. The value 
for
+   * tuples per application window is rounded up.
+   * @param context The context of the Operator for which tuples per 
application window is
+   * being computed.
+   * @param tuplesPerSecond Tuples per second.
+   * @return Tuples per application window.
+   */
+  public static long tpsToTpw(OperatorContext context, long tuplesPerSecond)
+  {
+    Preconditions.checkArgument(tuplesPerSecond > 0);
+    BigDecimal tuplesPerWindow = new 
BigDecimal(getAppWindowDurationMs(context));
+    tuplesPerWindow = tuplesPerWindow.divide(new BigDecimal(1000));
+    tuplesPerWindow = tuplesPerWindow.multiply(new 
BigDecimal(tuplesPerSecond));
+
+    Preconditions.checkArgument(tuplesPerWindow.compareTo(new 
BigDecimal(Long.MAX_VALUE)) <= 0,
+        "Overflow computing tuples per window.");
+
+    tuplesPerWindow = tuplesPerWindow.stripTrailingZeros();
+    long tuplesPerWindowLong = tuplesPerWindow.longValue();
+
+    if (tuplesPerWindow.scale() > 0) {
+      LOG.debug("{}", tuplesPerWindow);
+      tuplesPerWindowLong++;
+    }
+
+    return tuplesPerWindowLong;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(WindowUtils.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1abdb743/library/src/test/java/com/datatorrent/lib/util/time/WindowUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/util/time/WindowUtilsTest.java 
b/library/src/test/java/com/datatorrent/lib/util/time/WindowUtilsTest.java
new file mode 100644
index 0000000..d6965f7
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/util/time/WindowUtilsTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util.time;
+
+import java.math.BigDecimal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import 
com.datatorrent.lib.helper.OperatorContextTestHelper.TestIdOperatorContext;
+
+public class WindowUtilsTest
+{
+  @Test
+  public void getAppWindowDurationMSTest()
+  {
+    OperatorContext context = createOperatorContext(500, 10);
+    long appWindowDuration = WindowUtils.getAppWindowDurationMs(context);
+    Assert.assertEquals(5000L, appWindowDuration);
+  }
+
+  @Test
+  public void getAppWindowDurationMSOverflowTest()
+  {
+    OperatorContext context = createOperatorContext(Integer.MAX_VALUE, 
Integer.MAX_VALUE);
+    long appWindowDuration = WindowUtils.getAppWindowDurationMs(context);
+
+    Assert.assertEquals(new BigDecimal(Integer.MAX_VALUE).multiply(new 
BigDecimal(Integer.MAX_VALUE)),
+        new BigDecimal(appWindowDuration));
+  }
+
+  @Test
+  public void msToAppWindowCountSimpleTest()
+  {
+    OperatorContext context = createOperatorContext(500, 10);
+    long appWindowCount = WindowUtils.msToAppWindowCount(context, 10000L);
+    Assert.assertEquals(2, appWindowCount);
+
+    appWindowCount = WindowUtils.msToAppWindowCount(context, 10001L);
+    Assert.assertEquals(3, appWindowCount);
+  }
+
+  @Test
+  public void msToAppWindowCountRoundingTest()
+  {
+    OperatorContext context = createOperatorContext(500, 10);
+    long appWindowCount = WindowUtils.msToAppWindowCount(context, 10001L);
+    Assert.assertEquals(3, appWindowCount);
+  }
+
+  @Test
+  public void tpsToTpwSimpleTest()
+  {
+    OperatorContext context = createOperatorContext(100, 1);
+
+    long tuplesPerWindow = WindowUtils.tpsToTpw(context, 500L);
+    Assert.assertEquals(50L, tuplesPerWindow);
+
+  }
+
+  @Test
+  public void tpsToTpwRoundingTest()
+  {
+    OperatorContext context = createOperatorContext(100, 1);
+
+    long tuplesPerWindow = WindowUtils.tpsToTpw(context, 501L);
+    Assert.assertEquals(51L, tuplesPerWindow);
+  }
+
+  @Test
+  public void tpsToTpwOverflowTest()
+  {
+    OperatorContext context = createOperatorContext(Integer.MAX_VALUE, 1);
+
+    boolean overflowException = false;
+
+    try {
+      WindowUtils.tpsToTpw(context, Long.MAX_VALUE);
+    } catch (Exception e) {
+      overflowException = true;
+    }
+
+    Assert.assertTrue(overflowException);
+  }
+
+  public static OperatorContext createOperatorContext(int 
streamingWindowMillis, int appWindowCount)
+  {
+    DefaultAttributeMap attributeMap = new DefaultAttributeMap();
+    attributeMap.put(DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 
streamingWindowMillis);
+    attributeMap.put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount);
+
+    return new TestIdOperatorContext(1, attributeMap);
+  }
+}

Reply via email to