[
https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15638058#comment-15638058
]
ASF GitHub Bot commented on DRILL-4706:
---------------------------------------
Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/639#discussion_r86602221
--- Diff:
exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestLocalAffinityFragmentParallelizer.java
---
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Collections;
+
+import static java.lang.Integer.MAX_VALUE;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
+import static
org.apache.drill.exec.planner.fragment.LocalAffinityFragmentParallelizer.INSTANCE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+
+public class TestLocalAffinityFragmentParallelizer {
+
+ // Create a set of test endpoints
+ private static final DrillbitEndpoint DEP1 =
newDrillbitEndpoint("node1", 30010);
+ private static final DrillbitEndpoint DEP2 =
newDrillbitEndpoint("node2", 30010);
+ private static final DrillbitEndpoint DEP3 =
newDrillbitEndpoint("node3", 30010);
+ private static final DrillbitEndpoint DEP4 =
newDrillbitEndpoint("node4", 30010);
+ private static final DrillbitEndpoint DEP5 =
newDrillbitEndpoint("node5", 30010);
+
+ @Mocked private Fragment fragment;
+ @Mocked private PhysicalOperator root;
+
+ private static final DrillbitEndpoint newDrillbitEndpoint(String
address, int port) {
+ return
DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
+ }
+
+ private static final ParallelizationParameters newParameters(final
long threshold, final int maxWidthPerNode,
+ final int
maxGlobalWidth) {
+ return new ParallelizationParameters() {
+ @Override
+ public long getSliceTarget() {
+ return threshold;
+ }
+
+ @Override
+ public int getMaxWidthPerNode() {
+ return maxWidthPerNode;
+ }
+
+ @Override
+ public int getMaxGlobalWidth() {
+ return maxGlobalWidth;
+ }
+
+ /**
+ * {@link LocalAffinityFragmentParallelizer} doesn't use
affinity factor.
+ * @return
+ */
+ @Override
+ public double getAffinityFactor() {
+ return 0.0f;
+ }
+ };
+ }
+
+ private final Wrapper newWrapper(double cost, int minWidth, int
maxWidth, List<EndpointAffinity> endpointAffinities) {
+ new NonStrictExpectations() {
+ {
+ fragment.getRoot(); result = root;
+ }
+ };
+
+ final Wrapper fragmentWrapper = new Wrapper(fragment, 1);
+ final Stats stats = fragmentWrapper.getStats();
+ stats.setDistributionAffinity(DistributionAffinity.LOCAL);
+ stats.addCost(cost);
+ stats.addMinWidth(minWidth);
+ stats.addMaxWidth(maxWidth);
+ stats.addEndpointAffinities(endpointAffinities);
+ return fragmentWrapper;
+ }
+
+ private void checkEndpointAssignments(List<DrillbitEndpoint>
assignedEndpoints,
+ Map<DrillbitEndpoint, Integer>
expectedAssignments) throws Exception {
+ Map<DrillbitEndpoint, Integer> endpointAssignments = new
HashMap<>();
+ // Count the number of fragments assigned to each endpoint.
+ for (DrillbitEndpoint endpoint: assignedEndpoints) {
+ if (endpointAssignments.containsKey(endpoint)) {
+ endpointAssignments.put(endpoint,
endpointAssignments.get(endpoint) + 1);
+ } else {
+ endpointAssignments.put(endpoint, 1);
+ }
+ }
+
+ // Verify number of fragments assigned to each endpoint against
the expected value.
+ for (Map.Entry<DrillbitEndpoint, Integer> endpointAssignment :
endpointAssignments.entrySet()) {
+
assertEquals(expectedAssignments.get(endpointAssignment.getKey()).intValue(),
+ endpointAssignment.getValue().intValue());
+ }
+ }
+
+ @Test
+ public void testEqualLocalWorkUnitsUnderNodeLimit() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth,
maxWidth */
+ ImmutableList.of( /* endpointAffinities. */
+ /* For local affinity, we only care about
numLocalWorkUnits, the last column below */
+ /* endpoint, affinity_value, mandatory, maxWidth,
numLocalWorkUnits */
+ new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 16)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1 /*
sliceTarget */,
+ 23 /*
maxWidthPerNode */,
+ 200 /*
globalMaxWidth */),
+
ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5));
+ // Everyone should get assigned 16 because
+ // The parallelization maxWidth (80) is below the
globalMaxWidth(200) and
+ // localWorkUnits of all nodes is below maxWidthPerNode i.e. 23
+ Map<DrillbitEndpoint, Integer> expectedAssignments =
ImmutableMap.of(DEP1, 16,
+
DEP2, 16,
+
DEP3, 16,
+
DEP4, 16,
+
DEP5, 16);
+ // Expect the fragment parallelization to be 80 (16 * 5)
+ assertEquals(80, wrapper.getWidth());
+
+ final List<DrillbitEndpoint> assignedEndpoints =
wrapper.getAssignedEndpoints();
+ assertEquals(80, assignedEndpoints.size());
+ assertTrue(assignedEndpoints.contains(DEP1));
+ assertTrue(assignedEndpoints.contains(DEP2));
+ assertTrue(assignedEndpoints.contains(DEP3));
+ assertTrue(assignedEndpoints.contains(DEP4));
+ assertTrue(assignedEndpoints.contains(DEP5));
+
+ checkEndpointAssignments(assignedEndpoints, expectedAssignments);
+ }
+
+ @Test
+ public void testEqualLocalWorkUnitsAboveNodeLimit() throws Exception {
+ final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth,
maxWidth */
+ ImmutableList.of( /* endpointAffinities. */
+ /* For local affinity, we only care about
numLocalWorkUnits, the last column below */
+ /* endpoint, affinity_value, mandatory, maxWidth,
numLocalWorkUnits */
+ new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 16),
+ new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 16)
+ ));
+ INSTANCE.parallelizeFragment(wrapper, newParameters(1 /*
sliceTarget */,
+ 8 /*
maxWidthPerNode */,
+ 200 /*
globalMaxWidth */),
+
ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5));
+ // Everyone should get assigned 8 because
+ // maxWidthPerNode is 8 and localWorkUnits of all nodes is above
maxWidthPerNode.
+ // Also, the parallelization maxWidth (80) is below the
globalMaxWidth(200)
+ Map<DrillbitEndpoint, Integer> expectedAssignments =
ImmutableMap.of(DEP1, 8,
+
DEP2, 8,
+
DEP3, 8,
+
DEP4, 8,
+
DEP5, 8);
+ // Expect the fragment parallelization to be 80 (16 * 5)
--- End diff --
wrong comment. Should be 40
> Fragment planning causes Drillbits to read remote chunks when local copies
> are available
> ----------------------------------------------------------------------------------------
>
> Key: DRILL-4706
> URL: https://issues.apache.org/jira/browse/DRILL-4706
> Project: Apache Drill
> Issue Type: Bug
> Components: Query Planning & Optimization
> Affects Versions: 1.6.0
> Environment: CentOS, RHEL
> Reporter: Kunal Khatua
> Assignee: Sorabh Hamirwasia
> Labels: performance, planning
>
> When a table (datasize=70GB) of 160 parquet files (each having a single
> rowgroup and fitting within one chunk) is available on a 10-node setup with
> replication=3 ; a pure data scan query causes about 2% of the data to be read
> remotely.
> Even with the creation of metadata cache, the planner is selecting a
> sub-optimal plan of executing the SCAN fragments such that some of the data
> is served from a remote server.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)