Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 16b198109 -> f1fee8f9e
MLHR-1940 #comment change the util method name Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1abdb743 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1abdb743 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1abdb743 Branch: refs/heads/devel-3 Commit: 1abdb743496cbef7a6b7af7f0e7653076c1b4161 Parents: 365237d Author: Timothy Farkas <[email protected]> Authored: Sat Dec 12 19:51:31 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Sun Dec 13 00:22:04 2015 -0800 ---------------------------------------------------------------------- .../datatorrent/lib/util/time/WindowUtils.java | 100 ++++++++++++++++ .../lib/util/time/WindowUtilsTest.java | 113 +++++++++++++++++++ 2 files changed, 213 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1abdb743/library/src/main/java/com/datatorrent/lib/util/time/WindowUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/time/WindowUtils.java b/library/src/main/java/com/datatorrent/lib/util/time/WindowUtils.java new file mode 100644 index 0000000..16a4dfa --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/util/time/WindowUtils.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util.time; + +import java.math.BigDecimal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.Context.OperatorContext; + +/** + * This a is class which holds utility functions that can be used by operators to perform common + * tasks. + */ +public class WindowUtils +{ + /** + * Computes the duration of an Operator's application window in milliseconds. + * @param context The context of the Operator whose application window duration is + * being computed. + * @return The duration of an Operator's application window in milliseconds. + */ + public static long getAppWindowDurationMs(OperatorContext context) + { + return context.getValue(DAGContext.STREAMING_WINDOW_SIZE_MILLIS).longValue() * + context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT).longValue(); + } + + /** + * Computes the number of application windows that will fit into the given + * millisecond duration. This method rounds the number of application windows up. + * @param context The context of the Operator for which the number of application + * windows is being computed. + * @param millis The millisecond duration to compute the number of application windows for. + * @return The number of application windows that will fit into the given millisecond duration. + */ + public static long msToAppWindowCount(OperatorContext context, long millis) + { + Preconditions.checkArgument(millis > 0); + long appWindowDurationMS = getAppWindowDurationMs(context); + long appWindowCount = millis / appWindowDurationMS; + + if (millis % appWindowDurationMS != 0) { + appWindowCount++; + } + + return appWindowCount; + } + + /** + * Converts tuples per second into tuples per application window. The value for + * tuples per application window is rounded up. + * @param context The context of the Operator for which tuples per application window is + * being computed. + * @param tuplesPerSecond Tuples per second. + * @return Tuples per application window. + */ + public static long tpsToTpw(OperatorContext context, long tuplesPerSecond) + { + Preconditions.checkArgument(tuplesPerSecond > 0); + BigDecimal tuplesPerWindow = new BigDecimal(getAppWindowDurationMs(context)); + tuplesPerWindow = tuplesPerWindow.divide(new BigDecimal(1000)); + tuplesPerWindow = tuplesPerWindow.multiply(new BigDecimal(tuplesPerSecond)); + + Preconditions.checkArgument(tuplesPerWindow.compareTo(new BigDecimal(Long.MAX_VALUE)) <= 0, + "Overflow computing tuples per window."); + + tuplesPerWindow = tuplesPerWindow.stripTrailingZeros(); + long tuplesPerWindowLong = tuplesPerWindow.longValue(); + + if (tuplesPerWindow.scale() > 0) { + LOG.debug("{}", tuplesPerWindow); + tuplesPerWindowLong++; + } + + return tuplesPerWindowLong; + } + + private static final Logger LOG = LoggerFactory.getLogger(WindowUtils.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/1abdb743/library/src/test/java/com/datatorrent/lib/util/time/WindowUtilsTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/time/WindowUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/time/WindowUtilsTest.java new file mode 100644 index 0000000..d6965f7 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/util/time/WindowUtilsTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util.time; + +import java.math.BigDecimal; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.helper.OperatorContextTestHelper.TestIdOperatorContext; + +public class WindowUtilsTest +{ + @Test + public void getAppWindowDurationMSTest() + { + OperatorContext context = createOperatorContext(500, 10); + long appWindowDuration = WindowUtils.getAppWindowDurationMs(context); + Assert.assertEquals(5000L, appWindowDuration); + } + + @Test + public void getAppWindowDurationMSOverflowTest() + { + OperatorContext context = createOperatorContext(Integer.MAX_VALUE, Integer.MAX_VALUE); + long appWindowDuration = WindowUtils.getAppWindowDurationMs(context); + + Assert.assertEquals(new BigDecimal(Integer.MAX_VALUE).multiply(new BigDecimal(Integer.MAX_VALUE)), + new BigDecimal(appWindowDuration)); + } + + @Test + public void msToAppWindowCountSimpleTest() + { + OperatorContext context = createOperatorContext(500, 10); + long appWindowCount = WindowUtils.msToAppWindowCount(context, 10000L); + Assert.assertEquals(2, appWindowCount); + + appWindowCount = WindowUtils.msToAppWindowCount(context, 10001L); + Assert.assertEquals(3, appWindowCount); + } + + @Test + public void msToAppWindowCountRoundingTest() + { + OperatorContext context = createOperatorContext(500, 10); + long appWindowCount = WindowUtils.msToAppWindowCount(context, 10001L); + Assert.assertEquals(3, appWindowCount); + } + + @Test + public void tpsToTpwSimpleTest() + { + OperatorContext context = createOperatorContext(100, 1); + + long tuplesPerWindow = WindowUtils.tpsToTpw(context, 500L); + Assert.assertEquals(50L, tuplesPerWindow); + + } + + @Test + public void tpsToTpwRoundingTest() + { + OperatorContext context = createOperatorContext(100, 1); + + long tuplesPerWindow = WindowUtils.tpsToTpw(context, 501L); + Assert.assertEquals(51L, tuplesPerWindow); + } + + @Test + public void tpsToTpwOverflowTest() + { + OperatorContext context = createOperatorContext(Integer.MAX_VALUE, 1); + + boolean overflowException = false; + + try { + WindowUtils.tpsToTpw(context, Long.MAX_VALUE); + } catch (Exception e) { + overflowException = true; + } + + Assert.assertTrue(overflowException); + } + + public static OperatorContext createOperatorContext(int streamingWindowMillis, int appWindowCount) + { + DefaultAttributeMap attributeMap = new DefaultAttributeMap(); + attributeMap.put(DAGContext.STREAMING_WINDOW_SIZE_MILLIS, streamingWindowMillis); + attributeMap.put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount); + + return new TestIdOperatorContext(1, attributeMap); + } +}
