This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 67fbe86 [FLINK-11876] Introduce new InputSelectable, BoundedOneInput
and BoundedMultiInput interfaces for stream operators
67fbe86 is described below
commit 67fbe860d5fd97063634b9f226195d6ce1870bce
Author: sunhaibotb <[email protected]>
AuthorDate: Wed May 29 10:31:39 2019 +0800
[FLINK-11876] Introduce new InputSelectable, BoundedOneInput and
BoundedMultiInput interfaces for stream operators
---
.../streaming/api/operators/BoundedMultiInput.java | 33 +++++
.../streaming/api/operators/BoundedOneInput.java | 32 +++++
.../streaming/api/operators/InputSelectable.java | 47 +++++++
.../streaming/api/operators/InputSelection.java | 147 +++++++++++++++++++++
.../api/operators/InputSelectionTest.java | 95 +++++++++++++
5 files changed, 354 insertions(+)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedMultiInput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedMultiInput.java
new file mode 100755
index 0000000..af1acb4
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedMultiInput.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Interface for the multi-input operators that can process EndOfInput event.
+ */
+@PublicEvolving
+public interface BoundedMultiInput {
+
+ /**
+ * It is notified that no more data will arrive on the input identified
by the {@code inputId}.
+ * The {@code inputId} is numbered starting from 1, and `1` indicates
the first input.
+ */
+ void endInput(int inputId) throws Exception;
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
new file mode 100755
index 0000000..cd35f45
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BoundedOneInput.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Interface for the one-input operators that can process EndOfInput event.
+ */
+@PublicEvolving
+public interface BoundedOneInput {
+
+ /**
+ * It is notified that no more data will arrive on the input.
+ */
+ void endInput() throws Exception;
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java
new file mode 100755
index 0000000..df0fe93
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelectable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Interface for stream operators that can select the input to get
+ * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
+ *
+ * <p><b>IMPORTANT:</b> This interface is a loose contract. The runtime may
read multiple
+ * records continuously before calling {@code nextSelection()} again to
determine whether
+ * to change the input to be read. That is, it is not guaranteed that {@code
nextSelection()}
+ * will be called immediately after the operator has processed a record and
the reading input
+ * will be changed according to {@link InputSelection} returned. This means
that the operator
+ * may receive some data that it does not currently want to process.
Therefore, if an operator
+ * needs a strict convention, it must cache the unexpected data itself and
handle them correctly.
+ *
+ * <p>This interface also makes the following conventions:
+ * 1.The runtime must call {@code nextSelection()} to determine the input to
read the first record.
+ * 2.When the input being read reaches the end, the runtime must call {@code
nextSelection()} to
+ * determine the next input to be read.
+ */
+@PublicEvolving
+public interface InputSelectable {
+
+ /**
+ * Returns the next {@link InputSelection} that wants to get the record.
+ * This method is guaranteed to not be called concurrently with other
methods of the operator.
+ */
+ InputSelection nextSelection();
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelection.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelection.java
new file mode 100755
index 0000000..5b929f4
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputSelection.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * Describe the input selection that stream operators want to read records.
+ */
+@PublicEvolving
+public final class InputSelection implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The {@code InputSelection} instance which indicates to select all
inputs.
+ */
+ public static final InputSelection ALL = new InputSelection(-1,
Boolean.TRUE);
+
+ /**
+ * The {@code InputSelection} instance which indicates to select the
first input.
+ */
+ public static final InputSelection FIRST = new
Builder().select(1).build();
+
+ /**
+ * The {@code InputSelection} instance which indicates to select the
second input.
+ */
+ public static final InputSelection SECOND = new
Builder().select(2).build();
+
+ private final long inputMask;
+
+ private final boolean isALLMaskOf2;
+
+ private InputSelection(long inputMask, boolean isALLMaskOf2) {
+ this.inputMask = inputMask;
+ this.isALLMaskOf2 = isALLMaskOf2;
+ }
+
+ public long getInputMask() {
+ return inputMask;
+ }
+
+ /**
+ * Tells whether or not the input mask includes all of two inputs.
+ *
+ * @return {@code true} if the input mask includes all of two inputs,
false otherwise.
+ */
+ public boolean isALLMaskOf2() {
+ return isALLMaskOf2;
+ }
+
+ /**
+ * Fairly select one of the two inputs for reading. When {@code
inputMask} includes two inputs and
+ * both inputs are available, alternately select one of them.
Otherwise, select the available one
+ * of {@code inputMask}, or return -1 to indicate no input is selected.
+ *
+ * <p>Note that this supports only two inputs for performance reasons.
+ *
+ * @param availableInputsMask The mask of all available inputs.
+ * @param lastReadInputIndex The index of last read input.
+ * @return the index of the input for reading or -1, and -1 indicates
no input is selected (
+ * {@code inputMask} is empty or the inputs in {@code
inputMask} are unavailable).
+ */
+ public int fairSelectNextIndexOutOf2(int availableInputsMask, int
lastReadInputIndex) {
+ int selectionMask = (int) inputMask;
+ int combineMask = availableInputsMask & selectionMask;
+
+ if (combineMask == 3) {
+ return lastReadInputIndex == 0 ? 1 : 0;
+ } else if (combineMask >= 0 && combineMask < 3) {
+ return combineMask - 1;
+ }
+
+ throw new UnsupportedOperationException("Only two inputs are
supported.");
+ }
+
+ private static boolean isALLMaskOf2(long inputMask) {
+ return (3 & inputMask) == 3;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InputSelection that = (InputSelection) o;
+ return inputMask == that.inputMask;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(inputMask);
+ }
+
+ /**
+ * Utility class for creating {@link InputSelection}.
+ */
+ public static final class Builder {
+
+ private long inputMask = 0;
+
+ /**
+ * Selects an input identified by the given {@code inputId}.
+ *
+ * @param inputId
+ * the input id numbered starting from 1 to 64, and `1`
indicates the first input.
+ * Specially, `-1` indicates all inputs.
+ * @return a reference to this object.
+ */
+ public Builder select(int inputId) {
+ if (inputId > 0 && inputId <= 64){
+ inputMask |= 1L << (inputId - 1);
+ } else if (inputId == -1L) {
+ inputMask = -1L;
+ } else {
+ throw new IllegalArgumentException("The inputId
must be in the range of 1 to 64, or be -1.");
+ }
+
+ return this;
+ }
+
+ public InputSelection build() {
+ return new InputSelection(inputMask,
isALLMaskOf2(inputMask));
+ }
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InputSelectionTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InputSelectionTest.java
new file mode 100644
index 0000000..4f308b0
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InputSelectionTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.operators.InputSelection.Builder;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link InputSelection}.
+ */
+public class InputSelectionTest {
+
+ @Test
+ public void testIsALLMaskOf2() {
+ assertTrue(InputSelection.ALL.isALLMaskOf2());
+ assertTrue(new
Builder().select(1).select(2).build().isALLMaskOf2());
+
+ assertFalse(InputSelection.FIRST.isALLMaskOf2());
+ assertFalse(InputSelection.SECOND.isALLMaskOf2());
+ assertFalse(new Builder().select(3).build().isALLMaskOf2());
+ }
+
+ @Test
+ public void testFairSelectNextIndexOutOf2() {
+ assertEquals(1, InputSelection.ALL.fairSelectNextIndexOutOf2(3,
0));
+ assertEquals(0, new
Builder().select(1).select(2).build().fairSelectNextIndexOutOf2(3, 1));
+
+ assertEquals(1, InputSelection.ALL.fairSelectNextIndexOutOf2(2,
0));
+ assertEquals(1, InputSelection.ALL.fairSelectNextIndexOutOf2(2,
1));
+ assertEquals(0, InputSelection.ALL.fairSelectNextIndexOutOf2(1,
0));
+ assertEquals(0, InputSelection.ALL.fairSelectNextIndexOutOf2(1,
1));
+ assertEquals(-1,
InputSelection.ALL.fairSelectNextIndexOutOf2(0, 0));
+ assertEquals(-1,
InputSelection.ALL.fairSelectNextIndexOutOf2(0, 1));
+
+ assertEquals(0,
InputSelection.FIRST.fairSelectNextIndexOutOf2(1, 0));
+ assertEquals(0,
InputSelection.FIRST.fairSelectNextIndexOutOf2(3, 0));
+ assertEquals(-1,
InputSelection.FIRST.fairSelectNextIndexOutOf2(2, 0));
+ assertEquals(-1,
InputSelection.FIRST.fairSelectNextIndexOutOf2(0, 0));
+
+ assertEquals(1,
InputSelection.SECOND.fairSelectNextIndexOutOf2(2, 1));
+ assertEquals(1,
InputSelection.SECOND.fairSelectNextIndexOutOf2(3, 1));
+ assertEquals(-1,
InputSelection.SECOND.fairSelectNextIndexOutOf2(1, 1));
+ assertEquals(-1,
InputSelection.SECOND.fairSelectNextIndexOutOf2(0, 1));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testUnsupportedFairSelectNextIndexOutOf2() {
+ InputSelection.ALL.fairSelectNextIndexOutOf2(7, 0);
+ }
+
+ /**
+ * Tests for {@link Builder}.
+ */
+ public static class BuilderTest {
+
+ @Test
+ public void testSelect() {
+ assertEquals(1L, new
Builder().select(1).build().getInputMask());
+ assertEquals(7L, new
Builder().select(1).select(2).select(3).build().getInputMask());
+
+ assertEquals(0x8000_0000_0000_0000L, new
Builder().select(64).build().getInputMask());
+ assertEquals(0xffff_ffff_ffff_ffffL, new
Builder().select(-1).build().getInputMask());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalInputId1() {
+ new Builder().select(-2);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testIllegalInputId2() {
+ new Builder().select(65);
+ }
+ }
+}