This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch tuple2 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 66226d9391bf7ccdd553b8c1ba80c5510535e3e6 Author: benjobs <[email protected]> AuthorDate: Fri Jul 21 14:27:31 2023 +0800 Tuple support --- .../org/apache/streampark/common/tuple/Tuple.java | 63 +++++++ .../org/apache/streampark/common/tuple/Tuple1.java | 114 +++++++++++++ .../org/apache/streampark/common/tuple/Tuple2.java | 164 ++++++++++++++++++ .../org/apache/streampark/common/tuple/Tuple3.java | 172 +++++++++++++++++++ .../org/apache/streampark/common/tuple/Tuple4.java | 188 +++++++++++++++++++++ 5 files changed, 701 insertions(+) diff --git a/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple.java b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple.java new file mode 100644 index 000000000..563fec0e8 --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.tuple; + +/** + * The base class of all tuples. Tuples have a fix length and contain a set of fields, which may all + * be of different types. Because Tuples are strongly typed, each distinct tuple length is + * represented by its own class. Tuples exists with up to 25 fields and are described in the classes + * {@link Tuple1} to {@link Tuple4}. + * + * <p>The fields in the tuples may be accessed directly a public fields, or via position (zero + * indexed) {@link #get(int)}. + * + * <p>Tuples are in principle serializable. However, they may contain non-serializable fields, in + * which case serialization will fail. + */ +public abstract class Tuple implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + /** + * Gets the field at the specified position. + * + * @param pos The position of the field, zero indexed. + * @return The field at the specified position. + * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger + * than the number of fields. + */ + public abstract <T> T get(int pos); + + /** + * Sets the field at the specified position. + * + * @param value The value to be assigned to the field at the specified position. + * @param pos The position of the field, zero indexed. + * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger + * than the number of fields. + */ + public abstract <T> void set(T value, int pos); + + /** + * Shallow tuple copy. + * + * @return A new Tuple with the same fields as this. + */ + public abstract <T extends Tuple> T copy(); +} diff --git a/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple1.java b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple1.java new file mode 100644 index 000000000..87a3003ad --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple1.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.tuple; + +import java.util.Objects; + +public class Tuple1<T0> extends Tuple { + + private static final long serialVersionUID = 1L; + + /** Field 0 of the tuple. */ + public T0 f0; + + /** Creates a new tuple where all fields are null. */ + public Tuple1() {} + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. + * + * @param f0 The value for field 0 + */ + public Tuple1(T0 f0) { + this.f0 = f0; + } + + @Override + @SuppressWarnings("unchecked") + public <T> T get(int pos) { + if (pos == 0) { + return (T) this.f0; + } + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + + @Override + @SuppressWarnings("unchecked") + public <T> void set(T value, int pos) { + if (pos == 0) { + this.f0 = (T0) value; + } else { + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + } + + /** + * Sets new values to all fields of the tuple. + * + * @param f0 The value for field 0 + */ + public void set(T0 f0) { + this.f0 = f0; + } + + /** + * Deep equality for tuples by calling equals() on the tuple members. + * + * @param o the object checked for equality + * @return true if this is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Tuple1)) { + return false; + } + @SuppressWarnings("rawtypes") + Tuple1 tuple = (Tuple1) o; + return Objects.equals(f0, tuple.f0); + } + + @Override + public int hashCode() { + return f0 != null ? f0.hashCode() : 0; + } + + /** + * Shallow tuple copy. + * + * @return A new Tuple with the same fields as this. + */ + @Override + @SuppressWarnings("unchecked") + public Tuple1<T0> copy() { + return new Tuple1<>(this.f0); + } + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. This is more convenient + * than using the constructor, because the compiler can infer the generic type arguments + * implicitly. For example: {@code Tuple3.of(n, x, s)} instead of {@code new Tuple3<Integer, + * Double, String>(n, x, s)} + */ + public static <T0> Tuple1<T0> of(T0 f0) { + return new Tuple1<>(f0); + } +} diff --git a/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple2.java b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple2.java new file mode 100644 index 000000000..c88b7a89c --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple2.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.tuple; + +import java.util.Objects; + +/** + * A tuple with 2 fields. Tuples are strongly typed; each field may be of a separate type. The + * fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position + * through the {@link #get(int)} method. The tuple field positions start at zero. + * + * <p>Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions + * that work with Tuples to reuse objects in order to reduce pressure on the garbage collector. + * + * <p>Warning: If you subclass Tuple2, then be sure to either + * + * <ul> + * <li>not add any new fields, or + * <li>make it a POJO, and always declare the element type of your DataStreams/DataSets to your + * descendant type. (That is, if you have a "class Foo extends Tuple2", then don't use + * instances of Foo in a DataStream<Tuple2> / DataSet<Tuple2>, but declare it as + * DataStream<Foo> / DataSet<Foo>.) + * </ul> + * + * @see Tuple + * @param <T0> The type of field 0 + * @param <T1> The type of field 1 + */ +public class Tuple2<T0, T1> extends Tuple { + + private static final long serialVersionUID = 1L; + + /** Field 0 of the tuple. */ + public T0 f0; + /** Field 1 of the tuple. */ + public T1 f1; + + /** Creates a new tuple where all fields are null. */ + public Tuple2() {} + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. + * + * @param f0 The value for field 0 + * @param f1 The value for field 1 + */ + public Tuple2(T0 f0, T1 f1) { + this.f0 = f0; + this.f1 = f1; + } + + @Override + @SuppressWarnings("unchecked") + public <T> T get(int pos) { + switch (pos) { + case 0: + return (T) this.f0; + case 1: + return (T) this.f1; + default: + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + } + + @SuppressWarnings("unchecked") + public <T> void set(T value, int pos) { + switch (pos) { + case 0: + this.f0 = (T0) value; + break; + case 1: + this.f1 = (T1) value; + break; + default: + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + } + + /** + * Sets new values to all fields of the tuple. + * + * @param f0 The value for field 0 + * @param f1 The value for field 1 + */ + public void set(T0 f0, T1 f1) { + this.f0 = f0; + this.f1 = f1; + } + + /** + * Returns a shallow copy of the tuple with swapped values. + * + * @return shallow copy of the tuple with swapped values + */ + public Tuple2<T1, T0> swap() { + return new Tuple2<T1, T0>(f1, f0); + } + + /** + * Deep equality for tuples by calling equals() on the tuple members. + * + * @param o the object checked for equality + * @return true if this is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Tuple2)) { + return false; + } + @SuppressWarnings("rawtypes") + Tuple2 tuple = (Tuple2) o; + if (!Objects.equals(f0, tuple.f0)) { + return false; + } + return Objects.equals(f1, tuple.f1); + } + + @Override + public int hashCode() { + int result = f0 != null ? f0.hashCode() : 0; + result = 31 * result + (f1 != null ? f1.hashCode() : 0); + return result; + } + + /** + * Shallow tuple copy. + * + * @return A new Tuple with the same fields as this. + */ + @Override + @SuppressWarnings("unchecked") + public Tuple2<T0, T1> copy() { + return new Tuple2<>(this.f0, this.f1); + } + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. This is more convenient + * than using the constructor, because the compiler can infer the generic type arguments + * implicitly. For example: {@code Tuple3.of(n, x, s)} instead of {@code new Tuple3<Integer, + * Double, String>(n, x, s)} + */ + public static <T0, T1> Tuple2<T0, T1> of(T0 f0, T1 f1) { + return new Tuple2<>(f0, f1); + } +} diff --git a/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple3.java b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple3.java new file mode 100644 index 000000000..afa28804d --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple3.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.tuple; + +import java.util.Objects; + +/** + * A tuple with 3 fields. Tuples are strongly typed; each field may be of a separate type. The + * fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position + * through the {@link #get(int)} method. The tuple field positions start at zero. + * + * <p>Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions + * that work with Tuples to reuse objects in order to reduce pressure on the garbage collector. + * + * <p>Warning: If you subclass Tuple3, then be sure to either + * + * <ul> + * <li>not add any new fields, or + * <li>make it a POJO, and always declare the element type of your DataStreams/DataSets to your + * descendant type. (That is, if you have a "class Foo extends Tuple3", then don't use + * instances of Foo in a DataStream<Tuple3> / DataSet<Tuple3>, but declare it as + * DataStream<Foo> / DataSet<Foo>.) + * </ul> + * + * @see Tuple + * @param <T0> The type of field 0 + * @param <T1> The type of field 1 + * @param <T2> The type of field 2 + */ +public class Tuple3<T0, T1, T2> extends Tuple { + + private static final long serialVersionUID = 1L; + + /** Field 0 of the tuple. */ + public T0 f0; + /** Field 1 of the tuple. */ + public T1 f1; + /** Field 2 of the tuple. */ + public T2 f2; + + /** Creates a new tuple where all fields are null. */ + public Tuple3() {} + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. + * + * @param f0 The value for field 0 + * @param f1 The value for field 1 + * @param f2 The value for field 2 + */ + public Tuple3(T0 f0, T1 f1, T2 f2) { + this.f0 = f0; + this.f1 = f1; + this.f2 = f2; + } + + @Override + @SuppressWarnings("unchecked") + public <T> T get(int pos) { + switch (pos) { + case 0: + return (T) this.f0; + case 1: + return (T) this.f1; + case 2: + return (T) this.f2; + default: + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + } + + @Override + @SuppressWarnings("unchecked") + public <T> void set(T value, int pos) { + switch (pos) { + case 0: + this.f0 = (T0) value; + break; + case 1: + this.f1 = (T1) value; + break; + case 2: + this.f2 = (T2) value; + break; + default: + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + } + + /** + * Sets new values to all fields of the tuple. + * + * @param f0 The value for field 0 + * @param f1 The value for field 1 + * @param f2 The value for field 2 + */ + public void set(T0 f0, T1 f1, T2 f2) { + this.f0 = f0; + this.f1 = f1; + this.f2 = f2; + } + + /** + * Deep equality for tuples by calling equals() on the tuple members. + * + * @param o the object checked for equality + * @return true if this is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Tuple3)) { + return false; + } + @SuppressWarnings("rawtypes") + Tuple3 tuple = (Tuple3) o; + if (!Objects.equals(f0, tuple.f0)) { + return false; + } + if (!Objects.equals(f1, tuple.f1)) { + return false; + } + return Objects.equals(f2, tuple.f2); + } + + @Override + public int hashCode() { + int result = f0 != null ? f0.hashCode() : 0; + result = 31 * result + (f1 != null ? f1.hashCode() : 0); + result = 31 * result + (f2 != null ? f2.hashCode() : 0); + return result; + } + + /** + * Shallow tuple copy. + * + * @return A new Tuple with the same fields as this. + */ + @Override + @SuppressWarnings("unchecked") + public Tuple3<T0, T1, T2> copy() { + return new Tuple3<>(this.f0, this.f1, this.f2); + } + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. This is more convenient + * than using the constructor, because the compiler can infer the generic type arguments + * implicitly. For example: {@code Tuple3.of(n, x, s)} instead of {@code new Tuple3<Integer, + * Double, String>(n, x, s)} + */ + public static <T0, T1, T2> Tuple3<T0, T1, T2> of(T0 f0, T1 f1, T2 f2) { + return new Tuple3<>(f0, f1, f2); + } +} diff --git a/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple4.java b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple4.java new file mode 100644 index 000000000..cdc2b16cb --- /dev/null +++ b/streampark-common/src/main/java/org/apache/streampark/common/tuple/Tuple4.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.common.tuple; + +import java.util.Objects; + +/** + * A tuple with 4 fields. Tuples are strongly typed; each field may be of a separate type. The + * fields of the tuple can be accessed directly as public fields (f0, f1, ...) or via their position + * through the {@link #get(int)} method. The tuple field positions start at zero. + * + * <p>Tuples are mutable types, meaning that their fields can be re-assigned. This allows functions + * that work with Tuples to reuse objects in order to reduce pressure on the garbage collector. + * + * <p>Warning: If you subclass Tuple4, then be sure to either + * + * <ul> + * <li>not add any new fields, or + * <li>make it a POJO, and always declare the element type of your DataStreams/DataSets to your + * descendant type. (That is, if you have a "class Foo extends Tuple4", then don't use + * instances of Foo in a DataStream<Tuple4> / DataSet<Tuple4>, but declare it as + * DataStream<Foo> / DataSet<Foo>.) + * </ul> + * + * @see Tuple + * @param <T0> The type of field 0 + * @param <T1> The type of field 1 + * @param <T2> The type of field 2 + * @param <T3> The type of field 3 + */ +public class Tuple4<T0, T1, T2, T3> extends Tuple { + + private static final long serialVersionUID = 1L; + + /** Field 0 of the tuple. */ + public T0 f0; + /** Field 1 of the tuple. */ + public T1 f1; + /** Field 2 of the tuple. */ + public T2 f2; + /** Field 3 of the tuple. */ + public T3 f3; + + /** Creates a new tuple where all fields are null. */ + public Tuple4() {} + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. + * + * @param f0 The value for field 0 + * @param f1 The value for field 1 + * @param f2 The value for field 2 + * @param f3 The value for field 3 + */ + public Tuple4(T0 f0, T1 f1, T2 f2, T3 f3) { + this.f0 = f0; + this.f1 = f1; + this.f2 = f2; + this.f3 = f3; + } + + @Override + @SuppressWarnings("unchecked") + public <T> T get(int pos) { + switch (pos) { + case 0: + return (T) this.f0; + case 1: + return (T) this.f1; + case 2: + return (T) this.f2; + case 3: + return (T) this.f3; + default: + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + } + + @Override + @SuppressWarnings("unchecked") + public <T> void set(T value, int pos) { + switch (pos) { + case 0: + this.f0 = (T0) value; + break; + case 1: + this.f1 = (T1) value; + break; + case 2: + this.f2 = (T2) value; + break; + case 3: + this.f3 = (T3) value; + break; + default: + throw new IndexOutOfBoundsException(String.valueOf(pos)); + } + } + + /** + * Sets new values to all fields of the tuple. + * + * @param f0 The value for field 0 + * @param f1 The value for field 1 + * @param f2 The value for field 2 + * @param f3 The value for field 3 + */ + public void set(T0 f0, T1 f1, T2 f2, T3 f3) { + this.f0 = f0; + this.f1 = f1; + this.f2 = f2; + this.f3 = f3; + } + + /** + * Deep equality for tuples by calling equals() on the tuple members. + * + * @param o the object checked for equality + * @return true if this is equal to o. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Tuple4)) { + return false; + } + @SuppressWarnings("rawtypes") + Tuple4 tuple = (Tuple4) o; + if (!Objects.equals(f0, tuple.f0)) { + return false; + } + if (!Objects.equals(f1, tuple.f1)) { + return false; + } + if (!Objects.equals(f2, tuple.f2)) { + return false; + } + return Objects.equals(f3, tuple.f3); + } + + @Override + public int hashCode() { + int result = f0 != null ? f0.hashCode() : 0; + result = 31 * result + (f1 != null ? f1.hashCode() : 0); + result = 31 * result + (f2 != null ? f2.hashCode() : 0); + result = 31 * result + (f3 != null ? f3.hashCode() : 0); + return result; + } + + /** + * Shallow tuple copy. + * + * @return A new Tuple with the same fields as this. + */ + @Override + @SuppressWarnings("unchecked") + public Tuple4<T0, T1, T2, T3> copy() { + return new Tuple4<>(this.f0, this.f1, this.f2, this.f3); + } + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. This is more convenient + * than using the constructor, because the compiler can infer the generic type arguments + * implicitly. For example: {@code Tuple3.of(n, x, s)} instead of {@code new Tuple3<Integer, + * Double, String>(n, x, s)} + */ + public static <T0, T1, T2, T3> Tuple4<T0, T1, T2, T3> of(T0 f0, T1 f1, T2 f2, T3 f3) { + return new Tuple4<>(f0, f1, f2, f3); + } +}
