[
https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15638059#comment-15638059
]
ASF GitHub Bot commented on DRILL-4706:
---------------------------------------
Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/639#discussion_r86646115
--- Diff:
exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestLocalAffinityFragmentParallelizer.java
---
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Collections;
+
+import static java.lang.Integer.MAX_VALUE;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static
org.apache.drill.exec.planner.fragment.LocalAffinityFragmentParallelizer.INSTANCE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+
+public class TestLocalAffinityFragmentParallelizer {
+
+ // Create a set of test endpoints
+ private static final DrillbitEndpoint DEP1 =
newDrillbitEndpoint("node1", 30010);
+ private static final DrillbitEndpoint DEP2 =
newDrillbitEndpoint("node2", 30010);
+ private static final DrillbitEndpoint DEP3 =
newDrillbitEndpoint("node3", 30010);
+ private static final DrillbitEndpoint DEP4 =
newDrillbitEndpoint("node4", 30010);
+ private static final DrillbitEndpoint DEP5 =
newDrillbitEndpoint("node5", 30010);
+
+ @Mocked private Fragment fragment;
+ @Mocked private PhysicalOperator root;
+
+ private static final DrillbitEndpoint newDrillbitEndpoint(String
address, int port) {
+ return
DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
+ }
+
+ private static final ParallelizationParameters newParameters(final
long threshold, final int maxWidthPerNode,
+ final int
maxGlobalWidth) {
+ return new ParallelizationParameters() {
+ @Override
+ public long getSliceTarget() {
+ return threshold;
+ }
+
+ @Override
+ public int getMaxWidthPerNode() {
+ return maxWidthPerNode;
+ }
+
+ @Override
+ public int getMaxGlobalWidth() {
+ return maxGlobalWidth;
+ }
+
+ /**
+ * {@link LocalAffinityFragmentParallelizer} doesn't use
affinity factor.
+ * @return
+ */
+ @Override
+ public double getAffinityFactor() {
+ return 0.0f;
+ }
+ };
+ }
+
+ private final Wrapper newWrapper(double cost, int minWidth, int
maxWidth, List<EndpointAffinity> endpointAffinities) {
+ new NonStrictExpectations() {
+ {
+ fragment.getRoot(); result = root;
+ }
+ };
+
+ final Wrapper fragmentWrapper = new Wrapper(fragment, 1);
+ final Stats stats = fragmentWrapper.getStats();
+ stats.setDistributionAffinity(DistributionAffinity.LOCAL);
+ stats.addCost(cost);
+ stats.addMinWidth(minWidth);
+ stats.addMaxWidth(maxWidth);
+ stats.addEndpointAffinities(endpointAffinities);
+ return fragmentWrapper;
+ }
+
+ private void checkEndpointAssignments(List<DrillbitEndpoint>
assignedEndpoints,
+ Map<DrillbitEndpoint, Integer>
expectedAssignments) throws Exception {
+ Map<DrillbitEndpoint, Integer> endpointAssignments = new
HashMap<>();
+ // Count the number of fragments assigned to each endpoint.
+ for (DrillbitEndpoint endpoint: assignedEndpoints) {
+ if (endpointAssignments.containsKey(endpoint)) {
+ endpointAssignments.put(endpoint,
endpointAssignments.get(endpoint) + 1);
+ } else {
+ endpointAssignments.put(endpoint, 1);
+ }
+ }
+
+ // Verify number of fragments assigned to each endpoint against
the expected value.
+ for (Map.Entry<DrillbitEndpoint, Integer> endpointAssignment :
endpointAssignments.entrySet()) {
+
assertEquals(expectedAssignments.get(endpointAssignment.getKey()).intValue(),
+ endpointAssignment.getValue().intValue());
+ }
+ }
+
+ @Test
+ public void testEqualLocalWorkUnitsUnderNodeLimit() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth,
maxWidth */
+ ImmutableList.of( /* endpointAffinities. */
+ /* For local affinity, we only care about
numLocalWorkUnits, the last column below */
+ /* endpoint, affinity_value, mandatory, maxWidth,
numLocalWorkUnits */
+ new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 16)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1 /*
sliceTarget */,
+ 23 /*
maxWidthPerNode */,
+ 200 /*
globalMaxWidth */),
+
ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5));
+ // Everyone should get assigned 16 because
+ // The parallelization maxWidth (80) is below the
globalMaxWidth(200) and
+ // localWorkUnits of all nodes is below maxWidthPerNode i.e. 23
+ Map<DrillbitEndpoint, Integer> expectedAssignments =
ImmutableMap.of(DEP1, 16,
+
DEP2, 16,
+
DEP3, 16,
+
DEP4, 16,
+
DEP5, 16);
+ // Expect the fragment parallelization to be 80 (16 * 5)
+ assertEquals(80, wrapper.getWidth());
+
+ final List<DrillbitEndpoint> assignedEndpoints =
wrapper.getAssignedEndpoints();
+ assertEquals(80, assignedEndpoints.size());
+ assertTrue(assignedEndpoints.contains(DEP1));
+ assertTrue(assignedEndpoints.contains(DEP2));
+ assertTrue(assignedEndpoints.contains(DEP3));
+ assertTrue(assignedEndpoints.contains(DEP4));
+ assertTrue(assignedEndpoints.contains(DEP5));
+
+ checkEndpointAssignments(assignedEndpoints, expectedAssignments);
+ }
+
+ @Test
+ public void testEqualLocalWorkUnitsAboveNodeLimit() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth,
maxWidth */
+ ImmutableList.of( /* endpointAffinities. */
+ /* For local affinity, we only care about
numLocalWorkUnits, the last column below */
+ /* endpoint, affinity_value, mandatory, maxWidth,
numLocalWorkUnits */
+ new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 16)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1 /*
sliceTarget */,
+ 8 /*
maxWidthPerNode */,
+ 200 /*
globalMaxWidth */),
+
ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5));
+ // Everyone should get assigned 8 because
+ // maxWidthPerNode is 8 and localWorkUnits of all nodes is above
maxWidthPerNode.
+ // Also, the parallelization maxWidth (80) is below the
globalMaxWidth(200)
+ Map<DrillbitEndpoint, Integer> expectedAssignments =
ImmutableMap.of(DEP1, 8,
+
DEP2, 8,
+
DEP3, 8,
+
DEP4, 8,
+
DEP5, 8);
+ // Expect the fragment parallelization to be 80 (16 * 5)
+ assertEquals(40, wrapper.getWidth());
+
+ final List<DrillbitEndpoint> assignedEndpoints =
wrapper.getAssignedEndpoints();
+ assertEquals(40, assignedEndpoints.size());
+ assertTrue(assignedEndpoints.contains(DEP1));
+ assertTrue(assignedEndpoints.contains(DEP2));
+ assertTrue(assignedEndpoints.contains(DEP3));
+ assertTrue(assignedEndpoints.contains(DEP4));
+ assertTrue(assignedEndpoints.contains(DEP5));
+
+ checkEndpointAssignments(assignedEndpoints, expectedAssignments);
+ }
+
+ @Test
+ public void testUnEqualLocalWorkUnitsUnderNodeLimit() throws Exception
{
+ final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth,
maxWidth, endpointAffinities */
+ ImmutableList.of( /* endpointAffinities. */
+ /* For local affinity, we only care about
numLocalWorkUnits, the last column below */
+ /* endpoint, affinity_value, mandatory, maxWidth,
numLocalWorkUnits */
+ new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 14),
+ new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 15),
+ new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 17),
+ new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 18)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1
/*sliceTarget */,
+ 23 /* maxWidthPerNode */,
+ 200 /* globalMaxWidth */),
+ ImmutableList.of(DEP1, DEP2, DEP3,
DEP4, DEP5));
+ // All DrillbitEndpoints should get fragments same as
localWorkUnits they have.
+ // The parallelization maxWidth (80) is below the
globalMaxWidth(200) and
+ // localWorkUnits of all nodes is below maxWidthPerNode i.e. 23
+ Map<DrillbitEndpoint, Integer> expectedAssignments =
ImmutableMap.of(DEP1, 14,
+
DEP2, 15,
+
DEP3, 16,
+
DEP4, 17,
+
DEP5, 18);
+ // Expect the fragment parallelization to be 80 (14 + 15 + 16 + 17
+ 18)
+ assertEquals(80, wrapper.getWidth());
+
+ // All Drillbit Endpoints should get fragments assigned.
+ final List<DrillbitEndpoint> assignedEndpoints =
wrapper.getAssignedEndpoints();
+ assertEquals(80, assignedEndpoints.size());
+ assertTrue(assignedEndpoints.contains(DEP1));
+ assertTrue(assignedEndpoints.contains(DEP2));
+ assertTrue(assignedEndpoints.contains(DEP3));
+ assertTrue(assignedEndpoints.contains(DEP4));
+ assertTrue(assignedEndpoints.contains(DEP5));
+
+ checkEndpointAssignments(assignedEndpoints, expectedAssignments);
+ }
+
+ @Test
+ public void testUnequalLocalWorkUnitsAboveNodeLimit() throws Exception
{
+ final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth,
maxWidth, endpointAffinities */
+ ImmutableList.of( /* endpointAffinities. */
+ /* For local affinity, we only care about
numLocalWorkUnits, the last column below */
+ /* endpoint, affinity_value, mandatory, maxWidth,
numLocalWorkUnits */
+ new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 14),
+ new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 15),
+ new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 17),
+ new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 18)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1
/*sliceTarget */,
+ 16 /*
maxWidthPerNode */,
+ 200 /*
globalMaxWidth */),
+
ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5));
+ // All nodes should get 16 or less fragments assigned since
maxWidthPerNode is 16.
+ // Nodes with localWorkUnits less than 16 should get assigned
fragments same as localWorkUnits.
+ // Nodes with localWorkUnits 17 and 18 should get assigned only 16
fragments (they will be capped
+ // by maxWidthPerNode).
+ Map<DrillbitEndpoint, Integer> expectedAssignments =
ImmutableMap.of(DEP1, 14,
+
DEP2, 15,
+
DEP3, 16,
+
DEP4, 16,
+
DEP5, 16);
+ // Expect the fragment parallelization to be 77 (14 + 15 + 16 + 16
+ 16)
+ // maxWidthPerNode is 16.
+ // The parallelization maxWidth (80) is below the
globalMaxWidth(200)
+ assertEquals(77, wrapper.getWidth());
+
+ // All nodes should get fragments assigned.
+ final List<DrillbitEndpoint> assignedEndpoints =
wrapper.getAssignedEndpoints();
+ assertEquals(77, assignedEndpoints.size());
+ assertTrue(assignedEndpoints.contains(DEP1));
+ assertTrue(assignedEndpoints.contains(DEP2));
+ assertTrue(assignedEndpoints.contains(DEP3));
+ assertTrue(assignedEndpoints.contains(DEP4));
+ assertTrue(assignedEndpoints.contains(DEP5));
+
+ checkEndpointAssignments(assignedEndpoints, expectedAssignments);
+ }
+
+ @Test
+ public void testTotalWorkUnitsMoreThanGlobalMaxWidth() throws
Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth,
maxWidth, endpointAffinities */
+ ImmutableList.of( /* endpointAffinities. */
+ /* For local affinity, we only care about
numLocalWorkUnits, the last column below */
+ /* endpoint, affinity_value, mandatory, maxWidth,
numLocalWorkUnits */
+ new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 14),
+ new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 15),
+ new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 17),
+ new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 18)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1
/*sliceTarget */,
+ 16 /*
maxWidthPerNode */,
+ 40 /*
globalMaxWidth */),
+
ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5));
+ // The parallelization maxWidth (80) is more than
globalMaxWidth(40).
+ // Expect the fragment parallelization to be 40 (7 + 8 + 8 + 8 + 9)
--- End diff --
It would be great to mention that DEP5 is getting 9 fragment instead of
DEP4 since that has more localWorkUnits. We do favor nodes with more
localWorkUnit.
> Fragment planning causes Drillbits to read remote chunks when local copies
> are available
> ----------------------------------------------------------------------------------------
>
> Key: DRILL-4706
> URL: https://issues.apache.org/jira/browse/DRILL-4706
> Project: Apache Drill
> Issue Type: Bug
> Components: Query Planning & Optimization
> Affects Versions: 1.6.0
> Environment: CentOS, RHEL
> Reporter: Kunal Khatua
> Assignee: Sorabh Hamirwasia
> Labels: performance, planning
>
> When a table (datasize=70GB) of 160 parquet files (each having a single
> rowgroup and fitting within one chunk) is available on a 10-node setup with
> replication=3 ; a pure data scan query causes about 2% of the data to be read
> remotely.
> Even with the creation of metadata cache, the planner is selecting a
> sub-optimal plan of executing the SCAN fragments such that some of the data
> is served from a remote server.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)