[
https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15627397#comment-15627397
]
ASF GitHub Bot commented on DRILL-4706:
---------------------------------------
Github user sohami commented on a diff in the pull request:
https://github.com/apache/drill/pull/639#discussion_r86060309
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
---
@@ -822,10 +838,103 @@ private void getFiles(String path, List<FileStatus>
fileStatuses) throws IOExcep
}
}
+ /*
+ * Figure out the best node to scan each of the rowGroups and update the
preferredEndpoint.
+ * Based on this, update the total work units assigned to the endpoint
in the endpointAffinity.
+ */
+ private void computeRowGroupAssignment() {
+ Map<DrillbitEndpoint, Integer> numEndpointAssignments =
Maps.newHashMap();
+ Map<DrillbitEndpoint, Long> numAssignedBytes = Maps.newHashMap();
+
+ // Do this for 2 iterations to adjust node assignments after first
iteration.
+ int numIterartions = 2;
+
+ while (numIterartions-- > 0) {
+
+ for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+ EndpointByteMap endpointByteMap = rowGroupInfo.getByteMap();
+
+ // This can be empty for local file system or if drilbit is not
running
+ // on hosts which have data.
+ if (endpointByteMap.isEmpty()) {
+ continue;
+ }
+
+ // Get the list of endpoints which have maximum (equal) data.
+ List<DrillbitEndpoint> topEndpoints =
endpointByteMap.getTopEndpoints();
+
+ long minBytes = 0, numBytes = 0;
+ DrillbitEndpoint nodePicked = rowGroupInfo.preferredEndpoint;
+ if (nodePicked != null &&
numAssignedBytes.containsKey(nodePicked)) {
+ minBytes = numAssignedBytes.get(nodePicked);
+ }
+
+ DrillbitEndpoint previousNodePicked = nodePicked;
+
+ for (DrillbitEndpoint endpoint : topEndpoints) {
+ if (nodePicked == null) {
+ nodePicked = endpoint;
+ if (numAssignedBytes.containsKey(nodePicked)) {
+ minBytes = numAssignedBytes.get(nodePicked);
+ }
+ }
+
+ if (numAssignedBytes.containsKey(endpoint)) {
+ numBytes = numAssignedBytes.get(endpoint);
+ } else {
+ numBytes = 0;
+ }
+
+ if (numBytes < minBytes) {
+ nodePicked = endpoint;
+ minBytes = numBytes;
+ }
+ }
+
+ if (nodePicked != null && nodePicked != previousNodePicked) {
+ numAssignedBytes.put(nodePicked, minBytes +
endpointByteMap.get(nodePicked));
+ if (numEndpointAssignments.containsKey(nodePicked)) {
+ numEndpointAssignments.put(nodePicked,
numEndpointAssignments.get(nodePicked) + 1);
+ } else {
+ numEndpointAssignments.put(nodePicked, 1);
+ }
+
+ // If a different node is picked in second iteration, update.
+ if (previousNodePicked != null) {
+ numAssignedBytes.put(previousNodePicked,
+ numAssignedBytes.get(previousNodePicked) -
endpointByteMap.get(previousNodePicked));
+ numEndpointAssignments.put(previousNodePicked,
numEndpointAssignments.get(previousNodePicked) - 1);
+ }
+ }
+ rowGroupInfo.preferredEndpoint = nodePicked;
+ }
+ }
+
+ // Set the number of local work units for each endpoint in the
endpointAffinity.
+ for (EndpointAffinity epAff : endpointAffinities) {
+ DrillbitEndpoint endpoint = epAff.getEndpoint();
+ if (numEndpointAssignments.containsKey(endpoint)) {
+ epAff.setNumLocalWorkUnits(numEndpointAssignments.get(endpoint));
+ } else {
+ epAff.setNumLocalWorkUnits(0);
--- End diff --
"else" condition is not required since by default it will be set to 0
> Fragment planning causes Drillbits to read remote chunks when local copies
> are available
> ----------------------------------------------------------------------------------------
>
> Key: DRILL-4706
> URL: https://issues.apache.org/jira/browse/DRILL-4706
> Project: Apache Drill
> Issue Type: Bug
> Components: Query Planning & Optimization
> Affects Versions: 1.6.0
> Environment: CentOS, RHEL
> Reporter: Kunal Khatua
> Assignee: Sorabh Hamirwasia
> Labels: performance, planning
>
> When a table (datasize=70GB) of 160 parquet files (each having a single
> rowgroup and fitting within one chunk) is available on a 10-node setup with
> replication=3 ; a pure data scan query causes about 2% of the data to be read
> remotely.
> Even with the creation of metadata cache, the planner is selecting a
> sub-optimal plan of executing the SCAN fragments such that some of the data
> is served from a remote server.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)