This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 67fbe86  [FLINK-11876] Introduce new InputSelectable, BoundedOneInput 
and BoundedMultiInput interfaces for stream operators
67fbe86 is described below

commit 67fbe860d5fd97063634b9f226195d6ce1870bce
Author: sunhaibotb <[email protected]>
AuthorDate: Wed May 29 10:31:39 2019 +0800

    [FLINK-11876] Introduce new InputSelectable, BoundedOneInput and 
BoundedMultiInput interfaces for stream operators
---
 .../streaming/api/operators/BoundedMultiInput.java |  33 +++++
 .../streaming/api/operators/BoundedOneInput.java   |  32 +++++
 .../streaming/api/operators/InputSelectable.java   |  47 +++++++
 .../streaming/api/operators/InputSelection.java    | 147 +++++++++++++++++++++
 .../api/operators/InputSelectionTest.java          |  95 +++++++++++++
 5 files changed, 354 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedMultiInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedMultiInput.java
new file mode 100755
index 0000000..af1acb4
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedMultiInput.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Interface for the multi-input operators that can process EndOfInput event.
+ */
+@PublicEvolving
+public interface BoundedMultiInput {
+
+       /**
+        * It is notified that no more data will arrive on the input identified 
by the {@code inputId}.
+        * The {@code inputId} is numbered starting from 1, and `1` indicates 
the first input.
+        */
+       void endInput(int inputId) throws Exception;
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
new file mode 100755
index 0000000..cd35f45
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Interface for the one-input operators that can process EndOfInput event.
+ */
+@PublicEvolving
+public interface BoundedOneInput {
+
+       /**
+        * It is notified that no more data will arrive on the input.
+        */
+       void endInput() throws Exception;
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java
new file mode 100755
index 0000000..df0fe93
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Interface for stream operators that can select the input to get
+ * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
+ *
+ * <p><b>IMPORTANT:</b> This interface is a loose contract. The runtime may 
read multiple
+ * records continuously before calling {@code nextSelection()} again to 
determine whether
+ * to change the input to be read. That is, it is not guaranteed that {@code 
nextSelection()}
+ * will be called immediately after the operator has processed a record and 
the reading input
+ * will be changed according to {@link InputSelection} returned. This means 
that the operator
+ * may receive some data that it does not currently want to process. 
Therefore, if an operator
+ * needs a strict convention, it must cache the unexpected data itself and 
handle them correctly.
+ *
+ * <p>This interface also makes the following conventions:
+ * 1.The runtime must call {@code nextSelection()} to determine the input to 
read the first record.
+ * 2.When the input being read reaches the end, the runtime must call {@code 
nextSelection()} to
+ *   determine the next input to be read.
+ */
+@PublicEvolving
+public interface InputSelectable {
+
+       /**
+        * Returns the next {@link InputSelection} that wants to get the record.
+        * This method is guaranteed to not be called concurrently with other 
methods of the operator.
+        */
+       InputSelection nextSelection();
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelection.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelection.java
new file mode 100755
index 0000000..5b929f4
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelection.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * Describe the input selection that stream operators want to read records.
+ */
+@PublicEvolving
+public final class InputSelection implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * The {@code InputSelection} instance which indicates to select all 
inputs.
+        */
+       public static final InputSelection ALL = new InputSelection(-1, 
Boolean.TRUE);
+
+       /**
+        * The {@code InputSelection} instance which indicates to select the 
first input.
+        */
+       public static final InputSelection FIRST = new 
Builder().select(1).build();
+
+       /**
+        * The {@code InputSelection} instance which indicates to select the 
second input.
+        */
+       public static final InputSelection SECOND = new 
Builder().select(2).build();
+
+       private final long inputMask;
+
+       private final boolean isALLMaskOf2;
+
+       private InputSelection(long inputMask, boolean isALLMaskOf2) {
+               this.inputMask = inputMask;
+               this.isALLMaskOf2 = isALLMaskOf2;
+       }
+
+       public long getInputMask() {
+               return inputMask;
+       }
+
+       /**
+        * Tells whether or not the input mask includes all of two inputs.
+        *
+        * @return {@code true} if the input mask includes all of two inputs, 
false otherwise.
+        */
+       public boolean isALLMaskOf2() {
+               return isALLMaskOf2;
+       }
+
+       /**
+        * Fairly select one of the two inputs for reading. When {@code 
inputMask} includes two inputs and
+        * both inputs are available, alternately select one of them. 
Otherwise, select the available one
+        * of {@code inputMask}, or return -1 to indicate no input is selected.
+        *
+        * <p>Note that this supports only two inputs for performance reasons.
+        *
+        * @param availableInputsMask The mask of all available inputs.
+        * @param lastReadInputIndex The index of last read input.
+        * @return the index of the input for reading or -1, and -1 indicates 
no input is selected (
+        *         {@code inputMask} is empty or the inputs in {@code 
inputMask} are unavailable).
+        */
+       public int fairSelectNextIndexOutOf2(int availableInputsMask, int 
lastReadInputIndex) {
+               int selectionMask = (int) inputMask;
+               int combineMask = availableInputsMask & selectionMask;
+
+               if (combineMask == 3) {
+                       return lastReadInputIndex == 0 ? 1 : 0;
+               } else if (combineMask >= 0 && combineMask < 3) {
+                       return combineMask - 1;
+               }
+
+               throw new UnsupportedOperationException("Only two inputs are 
supported.");
+       }
+
+       private static boolean isALLMaskOf2(long inputMask) {
+               return (3 & inputMask) == 3;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               InputSelection that = (InputSelection) o;
+               return inputMask == that.inputMask;
+       }
+
+       @Override
+       public String toString() {
+               return String.valueOf(inputMask);
+       }
+
+       /**
+        * Utility class for creating {@link InputSelection}.
+        */
+       public static final class Builder {
+
+               private long inputMask = 0;
+
+               /**
+                * Selects an input identified by the given {@code inputId}.
+                *
+                * @param inputId
+                *     the input id numbered starting from 1 to 64, and `1` 
indicates the first input.
+                *     Specially, `-1` indicates all inputs.
+                * @return a reference to this object.
+                */
+               public Builder select(int inputId) {
+                       if (inputId > 0 && inputId <= 64){
+                               inputMask |= 1L << (inputId - 1);
+                       } else if (inputId == -1L) {
+                               inputMask = -1L;
+                       } else {
+                               throw new IllegalArgumentException("The inputId 
must be in the range of 1 to 64, or be -1.");
+                       }
+
+                       return this;
+               }
+
+               public InputSelection build() {
+                       return new InputSelection(inputMask, 
isALLMaskOf2(inputMask));
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InputSelectionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InputSelectionTest.java
new file mode 100644
index 0000000..4f308b0
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InputSelectionTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.operators.InputSelection.Builder;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link InputSelection}.
+ */
+public class InputSelectionTest {
+
+       @Test
+       public void testIsALLMaskOf2() {
+               assertTrue(InputSelection.ALL.isALLMaskOf2());
+               assertTrue(new 
Builder().select(1).select(2).build().isALLMaskOf2());
+
+               assertFalse(InputSelection.FIRST.isALLMaskOf2());
+               assertFalse(InputSelection.SECOND.isALLMaskOf2());
+               assertFalse(new Builder().select(3).build().isALLMaskOf2());
+       }
+
+       @Test
+       public void testFairSelectNextIndexOutOf2() {
+               assertEquals(1, InputSelection.ALL.fairSelectNextIndexOutOf2(3, 
0));
+               assertEquals(0, new 
Builder().select(1).select(2).build().fairSelectNextIndexOutOf2(3, 1));
+
+               assertEquals(1, InputSelection.ALL.fairSelectNextIndexOutOf2(2, 
0));
+               assertEquals(1, InputSelection.ALL.fairSelectNextIndexOutOf2(2, 
1));
+               assertEquals(0, InputSelection.ALL.fairSelectNextIndexOutOf2(1, 
0));
+               assertEquals(0, InputSelection.ALL.fairSelectNextIndexOutOf2(1, 
1));
+               assertEquals(-1, 
InputSelection.ALL.fairSelectNextIndexOutOf2(0, 0));
+               assertEquals(-1, 
InputSelection.ALL.fairSelectNextIndexOutOf2(0, 1));
+
+               assertEquals(0, 
InputSelection.FIRST.fairSelectNextIndexOutOf2(1, 0));
+               assertEquals(0, 
InputSelection.FIRST.fairSelectNextIndexOutOf2(3, 0));
+               assertEquals(-1, 
InputSelection.FIRST.fairSelectNextIndexOutOf2(2, 0));
+               assertEquals(-1, 
InputSelection.FIRST.fairSelectNextIndexOutOf2(0, 0));
+
+               assertEquals(1, 
InputSelection.SECOND.fairSelectNextIndexOutOf2(2, 1));
+               assertEquals(1, 
InputSelection.SECOND.fairSelectNextIndexOutOf2(3, 1));
+               assertEquals(-1, 
InputSelection.SECOND.fairSelectNextIndexOutOf2(1, 1));
+               assertEquals(-1, 
InputSelection.SECOND.fairSelectNextIndexOutOf2(0, 1));
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testUnsupportedFairSelectNextIndexOutOf2() {
+               InputSelection.ALL.fairSelectNextIndexOutOf2(7, 0);
+       }
+
+       /**
+        * Tests for {@link Builder}.
+        */
+       public static class BuilderTest {
+
+               @Test
+               public void testSelect() {
+                       assertEquals(1L, new 
Builder().select(1).build().getInputMask());
+                       assertEquals(7L, new 
Builder().select(1).select(2).select(3).build().getInputMask());
+
+                       assertEquals(0x8000_0000_0000_0000L, new 
Builder().select(64).build().getInputMask());
+                       assertEquals(0xffff_ffff_ffff_ffffL, new 
Builder().select(-1).build().getInputMask());
+               }
+
+               @Test(expected = IllegalArgumentException.class)
+               public void testIllegalInputId1() {
+                       new Builder().select(-2);
+               }
+
+               @Test(expected = IllegalArgumentException.class)
+               public void testIllegalInputId2() {
+                       new Builder().select(65);
+               }
+       }
+}

Reply via email to