Repository: flink
Updated Branches:
  refs/heads/master a9b5579b3 -> e236680f1


[hotfix][filesystem] Remove incorrect equals methods in StreamWriters

This closes #6262.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e236680f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e236680f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e236680f

Branch: refs/heads/master
Commit: e236680f14ebbaf97243dc15718aad93b68b584a
Parents: a9b5579
Author: Piotr Nowojski <[email protected]>
Authored: Thu Jul 5 11:28:35 2018 +0200
Committer: Piotr Nowojski <[email protected]>
Committed: Thu Jul 5 15:19:41 2018 +0200

----------------------------------------------------------------------
 .../connectors/fs/AvroKeyValueSinkWriter.java   | 25 +-------
 .../connectors/fs/SequenceFileWriter.java       | 36 +++++-------
 .../connectors/fs/StreamWriterBase.java         | 22 +------
 .../streaming/connectors/fs/StringWriter.java   | 25 +-------
 .../fs/AvroKeyValueSinkWriterTest.java          |  8 +--
 .../connectors/fs/RollingSinkITCase.java        |  2 +-
 .../connectors/fs/SequenceFileWriterTest.java   |  8 +--
 .../fs/StreamWriterBaseComparator.java          | 60 ++++++++++++++++++++
 .../connectors/fs/StringWriterTest.java         |  8 +--
 .../fs/bucketing/BucketingSinkTest.java         |  3 +-
 10 files changed, 93 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index 6b2f7d6..0f73e8c 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -40,7 +40,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Map;
-import java.util.Objects;
 
 /**
 * Implementation of AvroKeyValue writer that can be used in Sink.
@@ -204,7 +203,7 @@ public class AvroKeyValueSinkWriter<K, V> extends 
StreamWriterBase<Tuple2<K, V>>
        }
 
        @Override
-       public Writer<Tuple2<K, V>> duplicate() {
+       public AvroKeyValueSinkWriter<K, V> duplicate() {
                return new AvroKeyValueSinkWriter<>(this);
        }
 
@@ -335,25 +334,7 @@ public class AvroKeyValueSinkWriter<K, V> extends 
StreamWriterBase<Tuple2<K, V>>
                }
        }
 
-       @Override
-       public int hashCode() {
-               return Objects.hash(super.hashCode(), properties);
-       }
-
-       @Override
-       public boolean equals(Object other) {
-               if (this == other) {
-                       return true;
-               }
-               if (other == null) {
-                       return false;
-               }
-               if (getClass() != other.getClass()) {
-                       return false;
-               }
-               AvroKeyValueSinkWriter<K, V> writer = 
(AvroKeyValueSinkWriter<K, V>) other;
-               // field comparison
-               return Objects.equals(properties, writer.properties)
-                       && super.equals(other);
+       Map<String, String> getProperties() {
+               return properties;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 2f42ef7..17b16dd 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile 
SequenceFiles}.
@@ -152,32 +151,23 @@ public class SequenceFileWriter<K extends Writable, V 
extends Writable> extends
        }
 
        @Override
-       public Writer<Tuple2<K, V>> duplicate() {
+       public SequenceFileWriter<K, V> duplicate() {
                return new SequenceFileWriter<>(this);
        }
 
-       @Override
-       public int hashCode() {
-               return Objects.hash(super.hashCode(), compressionCodecName, 
compressionType, keyClass, valueClass);
+       String getCompressionCodecName() {
+               return compressionCodecName;
        }
 
-       @Override
-       public boolean equals(Object other) {
-               if (this == other) {
-                       return true;
-               }
-               if (other == null) {
-                       return false;
-               }
-               if (getClass() != other.getClass()) {
-                       return false;
-               }
-               SequenceFileWriter<K, V> writer = (SequenceFileWriter<K, V>) 
other;
-               // field comparison
-               return Objects.equals(compressionCodecName, 
writer.compressionCodecName)
-                       && Objects.equals(compressionType, 
writer.compressionType)
-                       && Objects.equals(keyClass, writer.keyClass)
-                       && Objects.equals(valueClass, writer.valueClass)
-                       && super.equals(other);
+       SequenceFile.CompressionType getCompressionType() {
+               return compressionType;
+       }
+
+       Class<K> getKeyClass() {
+               return keyClass;
+       }
+
+       Class<V> getValueClass() {
+               return valueClass;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index f625ef3..d3035a5 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * Base class for {@link Writer Writers} that write to a {@link 
FSDataOutputStream}.
@@ -102,24 +101,7 @@ public abstract class StreamWriterBase<T> implements 
Writer<T> {
                }
        }
 
-       @Override
-       public int hashCode() {
-               return Boolean.hashCode(syncOnFlush);
-       }
-
-       @Override
-       public boolean equals(Object other) {
-               if (this == other) {
-                       return true;
-               }
-               if (other == null) {
-                       return false;
-               }
-               if (getClass() != other.getClass()) {
-                       return false;
-               }
-               StreamWriterBase<T> writer = (StreamWriterBase<T>) other;
-               // field comparison
-               return Objects.equals(syncOnFlush, writer.syncOnFlush);
+       public boolean isSyncOnFlush() {
+               return syncOnFlush;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 5c81b15..122bc7f 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.charset.IllegalCharsetNameException;
 import java.nio.charset.UnsupportedCharsetException;
-import java.util.Objects;
 
 /**
  * A {@link Writer} that uses {@code toString()} on the input elements and 
writes them to
@@ -87,29 +86,11 @@ public class StringWriter<T> extends StreamWriterBase<T> {
        }
 
        @Override
-       public Writer<T> duplicate() {
+       public StringWriter<T> duplicate() {
                return new StringWriter<>(this);
        }
 
-       @Override
-       public int hashCode() {
-               return Objects.hash(super.hashCode(), charsetName);
-       }
-
-       @Override
-       public boolean equals(Object other) {
-               if (this == other) {
-                       return true;
-               }
-               if (other == null) {
-                       return false;
-               }
-               if (getClass() != other.getClass()) {
-                       return false;
-               }
-               StringWriter<T> writer = (StringWriter<T>) other;
-               // field comparison
-               return Objects.equals(charsetName, writer.charsetName)
-                       && super.equals(other);
+       String getCharsetName() {
+               return charsetName;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
index 019e56d..864d9c1 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileConstants;
 import org.junit.Test;
@@ -47,11 +45,11 @@ public class AvroKeyValueSinkWriterTest {
 
                AvroKeyValueSinkWriter<String, String> writer = new 
AvroKeyValueSinkWriter(properties);
                writer.setSyncOnFlush(true);
-               Writer<Tuple2<String, String>> other = writer.duplicate();
+               AvroKeyValueSinkWriter<String, String> other = 
writer.duplicate();
 
-               assertTrue(writer.equals(other));
+               assertTrue(StreamWriterBaseComparator.equals(writer, other));
 
                writer.setSyncOnFlush(false);
-               assertFalse(writer.equals(other));
+               assertFalse(StreamWriterBaseComparator.equals(writer, other));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 93f6d52..86821a5 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -930,7 +930,7 @@ public class RollingSinkITCase extends TestLogger {
                }
 
                @Override
-               public Writer<T> duplicate() {
+               public StreamWriterWithConfigCheck<T> duplicate() {
                        return new StreamWriterWithConfigCheck<>(key, expect);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
index 7ea2264..44716d3 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.fs;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
@@ -36,11 +34,11 @@ public class SequenceFileWriterTest {
        public void testDuplicate() {
                SequenceFileWriter<Text, Text> writer = new 
SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK);
                writer.setSyncOnFlush(true);
-               Writer<Tuple2<Text, Text>> other = writer.duplicate();
+               SequenceFileWriter<Text, Text> other = writer.duplicate();
 
-               assertTrue(writer.equals(other));
+               assertTrue(StreamWriterBaseComparator.equals(writer, other));
 
                writer.setSyncOnFlush(false);
-               assertFalse(writer.equals(other));
+               assertFalse(StreamWriterBaseComparator.equals(writer, other));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
new file mode 100644
index 0000000..9472c29
--- /dev/null
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StreamWriterBaseComparator.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.fs;
+
+import org.apache.hadoop.io.Writable;
+
+import java.util.Objects;
+
+/**
+ * Helper class to perform partial comparisons of {@link StreamWriterBase} 
instances. During comparisons
+ * it ignores changes in underlying output streams.
+ */
+public class StreamWriterBaseComparator {
+
+       public static <T> boolean equals(
+                       StreamWriterBase<T> writer1,
+                       StreamWriterBase<T> writer2) {
+               return Objects.equals(writer1.isSyncOnFlush(), 
writer2.isSyncOnFlush());
+       }
+
+       public static <K, V> boolean equals(
+                       AvroKeyValueSinkWriter<K, V> writer1,
+                       AvroKeyValueSinkWriter<K, V> writer2) {
+               return equals((StreamWriterBase) writer1, (StreamWriterBase) 
writer2) &&
+                       Objects.equals(writer1.getProperties(), 
writer2.getProperties());
+       }
+
+       public static <K extends Writable, V extends Writable> boolean equals(
+                       SequenceFileWriter<K, V> writer1,
+                       SequenceFileWriter<K, V> writer2) {
+               return equals((StreamWriterBase) writer1, (StreamWriterBase) 
writer2) &&
+                       Objects.equals(writer1.getCompressionCodecName(), 
writer2.getCompressionCodecName()) &&
+                       Objects.equals(writer1.getCompressionType(), 
writer2.getCompressionType()) &&
+                       Objects.equals(writer1.getKeyClass(), 
writer2.getKeyClass()) &&
+                       Objects.equals(writer1.getValueClass(), 
writer2.getValueClass());
+       }
+
+       public static <T> boolean equals(
+               StringWriter<T> writer1,
+               StringWriter<T> writer2) {
+               return equals((StreamWriterBase) writer1, (StreamWriterBase) 
writer2) &&
+                       Objects.equals(writer1.getCharsetName(), 
writer2.getCharsetName());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
index 488f860..7009d94 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java
@@ -34,12 +34,12 @@ public class StringWriterTest {
        public void testDuplicate() {
                StringWriter<String> writer = new 
StringWriter(StandardCharsets.UTF_16.name());
                writer.setSyncOnFlush(true);
-               Writer<String> other = writer.duplicate();
+               StringWriter<String> other = writer.duplicate();
 
-               assertTrue(writer.equals(other));
+               assertTrue(StreamWriterBaseComparator.equals(writer, other));
 
                writer.setSyncOnFlush(false);
-               assertFalse(writer.equals(other));
-               assertFalse(writer.equals(new StringWriter<>()));
+               assertFalse(StreamWriterBaseComparator.equals(writer, other));
+               assertFalse(StreamWriterBaseComparator.equals(writer, new 
StringWriter<>()));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e236680f/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 362c078..dc84846 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
 import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
-import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -918,7 +917,7 @@ public class BucketingSinkTest extends TestLogger {
                }
 
                @Override
-               public Writer<Tuple2<K, V>> duplicate() {
+               public StreamWriterWithConfigCheck<K, V> duplicate() {
                        return new StreamWriterWithConfigCheck<>(properties, 
key, expect);
                }
        }

Reply via email to