abstractdog commented on code in PR #6456:
URL: https://github.com/apache/hive/pull/6456#discussion_r3246553372


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedMergeJoinMonitor.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SkewedMergeJoinMonitor {
+
+    private transient long mergeJoinSkewThreshold;
+    private transient boolean mergeJoinSkewAbort;
+    private transient boolean[] skewedKeyFlagged;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SkewedMergeJoinMonitor.class.getName());
+
+    public SkewedMergeJoinMonitor(long mergeJoinSkewThreshold, boolean 
mergeJoinSkewAbort, int maxAlias) {
+        this.mergeJoinSkewThreshold = mergeJoinSkewThreshold;
+        this.mergeJoinSkewAbort = mergeJoinSkewAbort;
+        skewedKeyFlagged = new boolean[maxAlias];
+    }
+
+    public boolean isActive() {

Review Comment:
   can be private, or package-protected @VisibleForTesting if unit tests need it



##########
ql/src/test/queries/clientnegative/mergejoin_skew_abort.q:
##########
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.cbo.enable=false;

Review Comment:
   cbo should be enabled, we can simply delete this



##########
ql/src/test/queries/clientpositive/mergejoin_skew_warn.q:
##########
@@ -0,0 +1,35 @@
+SET hive.vectorized.execution.enabled=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.cbo.enable=false;

Review Comment:
   cbo should be enabled, we can simply delete this



##########
ql/src/test/queries/clientnegative/mergejoin_skew_abort.q:
##########
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.cbo.enable=false;
+set hive.auto.convert.join=false;
+set hive.optimize.ppd=false;
+-- merge join observability config, with true should throw exception after skew
+-- join detected beyond the threshold
+set hive.merge.join.skew.threshold=2;
+set hive.merge.join.skew.abort=true;
+
+CREATE TABLE merge_skew_abort_a (key int, value string) STORED AS TEXTFILE;
+CREATE TABLE merge_skew_abort_b (key int, value string) STORED AS TEXTFILE;

Review Comment:
   remove "STORED AS TEXTFILE", we use default ORC unless there is a reason to 
do otherwise



##########
ql/src/test/queries/clientnegative/mergejoin_skew_abort.q:
##########
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=false;

Review Comment:
   vectorization should be enabled, we can simple delete this



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestCommonMergeJoinSkewThreshold.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedMergeJoinMonitor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonMergeJoinSkewThreshold {
+
+  private CommonMergeJoinOperator op;
+
+  @Before
+  public void setUp() {
+    op = new CommonMergeJoinOperator();
+  }
+
+  @Test
+  public void testDisabled_noWarnNoThrow() throws HiveException {
+    op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(
+            -1L, false, 4);
+
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, Long.MAX_VALUE);
+  }
+
+  @Test
+  public void testBelowThreshold_isOk() throws HiveException {
+    op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(
+            1000L, false, 4);
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 999L);
+  }
+
+  @Test
+  public void testAtThreshold_warnOnce() throws HiveException {
+    final var monitor = new SkewedMergeJoinMonitor(
+            500L, false, 4);
+
+    op.skewedMergeJoinMonitor = monitor;
+
+    // should warn without throwing
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 500L);
+
+    Assert.assertTrue("skewedKeyFlagged[0] must be set after the first 
crossing",
+        monitor.isFlagged(0));
+
+  }
+
+  @Test
+  public void testFlagsAreIndependentPerTag() throws HiveException {
+    final var monitor = new SkewedMergeJoinMonitor(
+            100L, false, 4);
+    op.skewedMergeJoinMonitor = monitor;
+
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 200L);
+    Assert.assertTrue("tag 0 should be flagged", monitor.isFlagged(0));
+    Assert.assertFalse("tag 1 should still be clear", monitor.isFlagged(1));
+
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 1, 150L);
+    Assert.assertTrue("tag 1 should now be flagged", monitor.isFlagged(1));
+  }
+
+  @Test
+  public void testAbortMode_belowThreshold_noThrow() throws HiveException {
+    op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(
+            100L, true, 4);
+
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 99L);

Review Comment:
   try to assert on something that reflects the expected outcome "below 
threshold"



##########
ql/src/test/queries/clientnegative/mergejoin_skew_abort.q:
##########
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=true;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.cbo.enable=true;

Review Comment:
   this is not needed, true is default



##########
ql/src/test/queries/clientnegative/mergejoin_skew_abort.q:
##########
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=true;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.cbo.enable=true;
+set hive.auto.convert.join=false;
+set hive.optimize.ppd=false;

Review Comment:
   is `hive.optimize.ppd=false` needed?



##########
ql/src/test/queries/clientpositive/mergejoin_skew_warn.q:
##########
@@ -0,0 +1,35 @@
+SET hive.vectorized.execution.enabled=true;
+set hive.mapred.mode=nonstrict;

Review Comment:
   is `hive.mapred.mode=nonstrict` needed?



##########
ql/src/test/queries/clientpositive/mergejoin_skew_warn.q:
##########
@@ -0,0 +1,35 @@
+SET hive.vectorized.execution.enabled=false;

Review Comment:
   vectorization should be enabled, we can simple delete this



##########
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java:
##########
@@ -1737,6 +1737,11 @@ public static enum ConfVars {
         "How many rows in the right-most join operand Hive should buffer 
before emitting the join result."),
     HIVE_JOIN_CACHE_SIZE("hive.join.cache.size", 25000,
         "How many rows in the joining tables (except the streaming table) 
should be cached in memory."),
+    HIVE_MERGE_JOIN_SKEW_THRESHOLD("hive.merge.join.skew.threshold", -1L,
+        "Maximum number of rows allowed per join key in a single Tez 
sort-merge join task before a "

Review Comment:
   we can remove "Tez"
   even "Tez" is the only execution engine we're currently supporting, this 
feature is theoretically orthogonal to that, so this is rather "Maximum number 
of rows allowed per join key in a single sort-merge reducer join task before..."



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestCommonMergeJoinSkewThreshold.java:
##########


Review Comment:
   in general: in Hive, 2 spaces indentation is used



##########
ql/src/test/queries/clientpositive/mergejoin_skew_warn.q:
##########
@@ -0,0 +1,35 @@
+SET hive.vectorized.execution.enabled=false;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.cbo.enable=false;
+set hive.auto.convert.join=false;
+set hive.optimize.ppd=false;
+set hive.merge.join.skew.threshold=2;
+set hive.merge.join.skew.abort=false;
+
+-- SORT_QUERY_RESULTS
+
+CREATE TABLE merge_skew_warn_a (key int, value string) STORED AS TEXTFILE;
+CREATE TABLE merge_skew_warn_b (key int, value string) STORED AS TEXTFILE;

Review Comment:
   remove "STORED AS TEXTFILE", we use default ORC unless there is a reason to 
do otherwise



##########
ql/src/test/queries/clientpositive/mergejoin_skew_warn.q:
##########
@@ -0,0 +1,35 @@
+SET hive.vectorized.execution.enabled=true;

Review Comment:
   this is not needed, true is default



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestCommonMergeJoinSkewThreshold.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedMergeJoinMonitor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonMergeJoinSkewThreshold {
+
+  private CommonMergeJoinOperator op;
+
+  @Before
+  public void setUp() {
+    op = new CommonMergeJoinOperator();
+  }
+
+  @Test
+  public void testDisabled_noWarnNoThrow() throws HiveException {
+    op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(
+            -1L, false, 4);
+
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, Long.MAX_VALUE);

Review Comment:
   try to assert on something that reflects the expected outcome



##########
ql/src/test/queries/clientpositive/mergejoin_skew_warn.q:
##########
@@ -0,0 +1,35 @@
+SET hive.vectorized.execution.enabled=true;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.cbo.enable=true;

Review Comment:
   this is not needed, true is default



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestCommonMergeJoinSkewThreshold.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedMergeJoinMonitor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonMergeJoinSkewThreshold {
+
+  private CommonMergeJoinOperator op;
+
+  @Before
+  public void setUp() {
+    op = new CommonMergeJoinOperator();
+  }
+
+  @Test
+  public void testDisabled_noWarnNoThrow() throws HiveException {
+    op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(
+            -1L, false, 4);
+
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, Long.MAX_VALUE);
+  }
+
+  @Test
+  public void testBelowThreshold_isOk() throws HiveException {
+    op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(
+            1000L, false, 4);
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 999L);

Review Comment:
   try to assert on something that reflects the expected outcome



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedJoinMonitor.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public interface SkewedJoinMonitor {
+
+    void checkMergeJoinSkew(byte alias, long rowCount) throws HiveException;

Review Comment:
   need basic javadoc for method and interface regarding:
   1. when the method is called"
   2. what is `alias` and `rowCount`?
   3. what's the possible outcome?



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/TestCommonMergeJoinSkewThreshold.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedMergeJoinMonitor;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonMergeJoinSkewThreshold {
+
+  private CommonMergeJoinOperator op;
+
+  @Before
+  public void setUp() {
+    op = new CommonMergeJoinOperator();
+  }
+
+  @Test
+  public void testDisabled_noWarnNoThrow() throws HiveException {
+    op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(
+            -1L, false, 4);
+
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, Long.MAX_VALUE);
+  }
+
+  @Test
+  public void testBelowThreshold_isOk() throws HiveException {
+    op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(
+            1000L, false, 4);
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 999L);
+  }
+
+  @Test
+  public void testAtThreshold_warnOnce() throws HiveException {
+    final var monitor = new SkewedMergeJoinMonitor(
+            500L, false, 4);
+
+    op.skewedMergeJoinMonitor = monitor;
+
+    // should warn without throwing
+    op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 500L);
+
+    Assert.assertTrue("skewedKeyFlagged[0] must be set after the first 
crossing",
+        monitor.isFlagged(0));
+

Review Comment:
   nit: extra line break is not needed



##########
ql/src/test/queries/clientnegative/mergejoin_skew_abort.q:
##########
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=true;

Review Comment:
   this is not needed, true is default



##########
ql/src/test/queries/clientpositive/mergejoin_skew_warn.q:
##########
@@ -0,0 +1,35 @@
+SET hive.vectorized.execution.enabled=true;
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.cbo.enable=true;
+set hive.auto.convert.join=false;
+set hive.optimize.ppd=false;

Review Comment:
   is `hive.optimize.ppd=false` needed?



##########
ql/src/test/queries/clientnegative/mergejoin_skew_abort.q:
##########
@@ -0,0 +1,20 @@
+SET hive.vectorized.execution.enabled=true;
+set hive.mapred.mode=nonstrict;

Review Comment:
   is `hive.mapred.mode=nonstrict` needed?



##########
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java:
##########
@@ -1737,6 +1737,11 @@ public static enum ConfVars {
         "How many rows in the right-most join operand Hive should buffer 
before emitting the join result."),
     HIVE_JOIN_CACHE_SIZE("hive.join.cache.size", 25000,
         "How many rows in the joining tables (except the streaming table) 
should be cached in memory."),
+    HIVE_MERGE_JOIN_SKEW_THRESHOLD("hive.merge.join.skew.threshold", -1L,
+        "Maximum number of rows allowed per join key in a single Tez 
sort-merge join task before a "
+        + "skew event is reported."),
+    HIVE_MERGE_JOIN_SKEW_ABORT("hive.merge.join.skew.abort", false,
+        "When set to true and the row count is equal to 
hive.merge.join.skew.threshold, the Tez task will be aborted."),

Review Comment:
   maybe remove "Tez" from here too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to