Repository: flink Updated Branches: refs/heads/master a9b5579b3 -> e236680f1
[hotfix][filesystem] Remove incorrect equals methods in StreamWriters This closes #6262. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e236680f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e236680f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e236680f Branch: refs/heads/master Commit: e236680f14ebbaf97243dc15718aad93b68b584a Parents: a9b5579 Author: Piotr Nowojski <[email protected]> Authored: Thu Jul 5 11:28:35 2018 +0200 Committer: Piotr Nowojski <[email protected]> Committed: Thu Jul 5 15:19:41 2018 +0200 ---------------------------------------------------------------------- .../connectors/fs/AvroKeyValueSinkWriter.java | 25 +------- .../connectors/fs/SequenceFileWriter.java | 36 +++++------- .../connectors/fs/StreamWriterBase.java | 22 +------ .../streaming/connectors/fs/StringWriter.java | 25 +------- .../fs/AvroKeyValueSinkWriterTest.java | 8 +-- .../connectors/fs/RollingSinkITCase.java | 2 +- .../connectors/fs/SequenceFileWriterTest.java | 8 +-- .../fs/StreamWriterBaseComparator.java | 60 ++++++++++++++++++++ .../connectors/fs/StringWriterTest.java | 8 +-- .../fs/bucketing/BucketingSinkTest.java | 3 +- 10 files changed, 93 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java index 6b2f7d6..0f73e8c 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java @@ -40,7 +40,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; import java.util.Map; -import java.util.Objects; /** * Implementation of AvroKeyValue writer that can be used in Sink. @@ -204,7 +203,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> } @Override - public Writer<Tuple2<K, V>> duplicate() { + public AvroKeyValueSinkWriter<K, V> duplicate() { return new AvroKeyValueSinkWriter<>(this); } @@ -335,25 +334,7 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> } } - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), properties); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null) { - return false; - } - if (getClass() != other.getClass()) { - return false; - } - AvroKeyValueSinkWriter<K, V> writer = (AvroKeyValueSinkWriter<K, V>) other; - // field comparison - return Objects.equals(properties, writer.properties) - && super.equals(other); + Map<String, String> getProperties() { + return properties; } } http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java index 2f42ef7..17b16dd 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java @@ -34,7 +34,6 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import java.io.IOException; -import java.util.Objects; /** * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}. @@ -152,32 +151,23 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> extends } @Override - public Writer<Tuple2<K, V>> duplicate() { + public SequenceFileWriter<K, V> duplicate() { return new SequenceFileWriter<>(this); } - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), compressionCodecName, compressionType, keyClass, valueClass); + String getCompressionCodecName() { + return compressionCodecName; } - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null) { - return false; - } - if (getClass() != other.getClass()) { - return false; - } - SequenceFileWriter<K, V> writer = (SequenceFileWriter<K, V>) other; - // field comparison - return Objects.equals(compressionCodecName, writer.compressionCodecName) - && Objects.equals(compressionType, writer.compressionType) - && Objects.equals(keyClass, writer.keyClass) - && Objects.equals(valueClass, writer.valueClass) - && super.equals(other); + SequenceFile.CompressionType getCompressionType() { + return compressionType; + } + + Class<K> getKeyClass() { + return keyClass; + } + + Class<V> getValueClass() { + return valueClass; } } http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java index f625ef3..d3035a5 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; -import java.util.Objects; /** * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}. @@ -102,24 +101,7 @@ public abstract class StreamWriterBase<T> implements Writer<T> { } } - @Override - public int hashCode() { - return Boolean.hashCode(syncOnFlush); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null) { - return false; - } - if (getClass() != other.getClass()) { - return false; - } - StreamWriterBase<T> writer = (StreamWriterBase<T>) other; - // field comparison - return Objects.equals(syncOnFlush, writer.syncOnFlush); + public boolean isSyncOnFlush() { + return syncOnFlush; } } http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java index 5c81b15..122bc7f 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.IllegalCharsetNameException; import java.nio.charset.UnsupportedCharsetException; -import java.util.Objects; /** * A {@link Writer} that uses {@code toString()} on the input elements and writes them to @@ -87,29 +86,11 @@ public class StringWriter<T> extends StreamWriterBase<T> { } @Override - public Writer<T> duplicate() { + public StringWriter<T> duplicate() { return new StringWriter<>(this); } - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), charsetName); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other == null) { - return false; - } - if (getClass() != other.getClass()) { - return false; - } - StringWriter<T> writer = (StringWriter<T>) other; - // field comparison - return Objects.equals(charsetName, writer.charsetName) - && super.equals(other); + String getCharsetName() { + return charsetName; } } http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java index 019e56d..864d9c1 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.connectors.fs; -import org.apache.flink.api.java.tuple.Tuple2; - import org.apache.avro.Schema; import org.apache.avro.file.DataFileConstants; import org.junit.Test; @@ -47,11 +45,11 @@ public class AvroKeyValueSinkWriterTest { AvroKeyValueSinkWriter<String, String> writer = new AvroKeyValueSinkWriter(properties); writer.setSyncOnFlush(true); - Writer<Tuple2<String, String>> other = writer.duplicate(); + AvroKeyValueSinkWriter<String, String> other = writer.duplicate(); - assertTrue(writer.equals(other)); + assertTrue(StreamWriterBaseComparator.equals(writer, other)); writer.setSyncOnFlush(false); - assertFalse(writer.equals(other)); + assertFalse(StreamWriterBaseComparator.equals(writer, other)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index 93f6d52..86821a5 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -930,7 +930,7 @@ public class RollingSinkITCase extends TestLogger { } @Override - public Writer<T> duplicate() { + public StreamWriterWithConfigCheck<T> duplicate() { return new StreamWriterWithConfigCheck<>(key, expect); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java index 7ea2264..44716d3 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.connectors.fs; -import org.apache.flink.api.java.tuple.Tuple2; - import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -36,11 +34,11 @@ public class SequenceFileWriterTest { public void testDuplicate() { SequenceFileWriter<Text, Text> writer = new SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK); writer.setSyncOnFlush(true); - Writer<Tuple2<Text, Text>> other = writer.duplicate(); + SequenceFileWriter<Text, Text> other = writer.duplicate(); - assertTrue(writer.equals(other)); + assertTrue(StreamWriterBaseComparator.equals(writer, other)); writer.setSyncOnFlush(false); - assertFalse(writer.equals(other)); + assertFalse(StreamWriterBaseComparator.equals(writer, other)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java new file mode 100644 index 0000000..9472c29 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.hadoop.io.Writable; + +import java.util.Objects; + +/** + * Helper class to perform partial comparisons of {@link StreamWriterBase} instances. During comparisons + * it ignores changes in underlying output streams. + */ +public class StreamWriterBaseComparator { + + public static <T> boolean equals( + StreamWriterBase<T> writer1, + StreamWriterBase<T> writer2) { + return Objects.equals(writer1.isSyncOnFlush(), writer2.isSyncOnFlush()); + } + + public static <K, V> boolean equals( + AvroKeyValueSinkWriter<K, V> writer1, + AvroKeyValueSinkWriter<K, V> writer2) { + return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) && + Objects.equals(writer1.getProperties(), writer2.getProperties()); + } + + public static <K extends Writable, V extends Writable> boolean equals( + SequenceFileWriter<K, V> writer1, + SequenceFileWriter<K, V> writer2) { + return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) && + Objects.equals(writer1.getCompressionCodecName(), writer2.getCompressionCodecName()) && + Objects.equals(writer1.getCompressionType(), writer2.getCompressionType()) && + Objects.equals(writer1.getKeyClass(), writer2.getKeyClass()) && + Objects.equals(writer1.getValueClass(), writer2.getValueClass()); + } + + public static <T> boolean equals( + StringWriter<T> writer1, + StringWriter<T> writer2) { + return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) && + Objects.equals(writer1.getCharsetName(), writer2.getCharsetName()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java index 488f860..7009d94 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java @@ -34,12 +34,12 @@ public class StringWriterTest { public void testDuplicate() { StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name()); writer.setSyncOnFlush(true); - Writer<String> other = writer.duplicate(); + StringWriter<String> other = writer.duplicate(); - assertTrue(writer.equals(other)); + assertTrue(StreamWriterBaseComparator.equals(writer, other)); writer.setSyncOnFlush(false); - assertFalse(writer.equals(other)); - assertFalse(writer.equals(new StringWriter<>())); + assertFalse(StreamWriterBaseComparator.equals(writer, other)); + assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>())); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index 362c078..dc84846 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -28,7 +28,6 @@ import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter; import org.apache.flink.streaming.connectors.fs.Clock; import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; -import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -918,7 +917,7 @@ public class BucketingSinkTest extends TestLogger { } @Override - public Writer<Tuple2<K, V>> duplicate() { + public StreamWriterWithConfigCheck<K, V> duplicate() { return new StreamWriterWithConfigCheck<>(properties, key, expect); } }
