vvysotskyi commented on a change in pull request #1886: DRILL-7273: Introduce operators for handling metadata URL: https://github.com/apache/drill/pull/1886#discussion_r344196105
########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java ########## @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.metadata; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.MetadataHandlerPOP; +import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.physical.resultSet.impl.OptionBuilder; +import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl; +import org.apache.drill.exec.physical.rowSet.DirectRowSet; +import org.apache.drill.exec.physical.rowSet.RowSetReader; +import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.metastore.components.tables.BasicTablesRequests; +import org.apache.drill.metastore.components.tables.Tables; +import org.apache.drill.metastore.metadata.BaseMetadata; +import org.apache.drill.metastore.metadata.FileMetadata; +import org.apache.drill.metastore.metadata.LocationProvider; +import org.apache.drill.metastore.metadata.MetadataInfo; +import org.apache.drill.metastore.metadata.MetadataType; +import org.apache.drill.metastore.metadata.RowGroupMetadata; +import org.apache.drill.metastore.metadata.SegmentMetadata; +import org.apache.drill.metastore.statistics.ExactStatisticsConstants; +import org.apache.drill.metastore.statistics.StatisticsKind; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; + +/** + * Operator responsible for handling metadata returned by incoming aggregate operators and fetching + * required metadata form the metastore. + */ +public class MetadataHandlerBatch extends AbstractSingleRecordBatch<MetadataHandlerPOP> { + private static final Logger logger = LoggerFactory.getLogger(MetadataHandlerBatch.class); + + private final Tables tables; + private final MetadataType metadataType; + private final Map<String, MetadataInfo> metadataToHandle; + + private boolean firstBatch = true; + + protected MetadataHandlerBatch(MetadataHandlerPOP popConfig, + FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { + super(popConfig, context, incoming); + this.tables = context.getMetastoreRegistry().get().tables(); + this.metadataType = popConfig.getMetadataHandlerContext().metadataType(); + this.metadataToHandle = popConfig.getMetadataHandlerContext().metadataToHandle() != null + ? popConfig.getMetadataHandlerContext().metadataToHandle().stream() + .collect(Collectors.toMap(MetadataInfo::identifier, Function.identity())) + : null; + } + + @Override + public IterOutcome doWork() { + // 1. Consume data from incoming operators and update metadataToHandle to remove incoming metadata + // 2. For the case when incoming operator returned nothing - no updated underlying metadata was found. + // 3. Fetches metadata which should be handled but wasn't returned by incoming batch from the metastore + + IterOutcome outcome = next(incoming); + + switch (outcome) { + case NONE: + if (firstBatch) { + Preconditions.checkState(metadataToHandle.isEmpty(), + "Incoming batch didn't return the result for modified segments"); + } + return outcome; + case OK_NEW_SCHEMA: + if (firstBatch) { + firstBatch = false; + if (!setupNewSchema()) { + outcome = IterOutcome.OK; + } + } + doWorkInternal(); + return outcome; + case OK: + assert !firstBatch : "First batch should be OK_NEW_SCHEMA"; + doWorkInternal(); + // fall thru + case OUT_OF_MEMORY: + case NOT_YET: + case STOP: + return outcome; + default: + throw new UnsupportedOperationException("Unsupported upstream state " + outcome); + } + } + + @Override + public IterOutcome innerNext() { + IterOutcome outcome = getLastKnownOutcome(); + if (outcome != NONE && outcome != STOP) { + outcome = super.innerNext(); + } + // if incoming is exhausted, reads metadata which should be obtained from the metastore + // and returns OK or NONE if there is no metadata to read + if (outcome == IterOutcome.NONE && !metadataToHandle.isEmpty()) { + BasicTablesRequests basicTablesRequests = tables.basicRequests(); + + switch (metadataType) { + case ROW_GROUP: { + List<RowGroupMetadata> rowGroups = + basicTablesRequests.rowGroupsMetadata( + popConfig.getMetadataHandlerContext().tableInfo(), + new ArrayList<>(metadataToHandle.values())); + return populateContainer(rowGroups); + } + case FILE: { + List<FileMetadata> files = + basicTablesRequests.filesMetadata( + popConfig.getMetadataHandlerContext().tableInfo(), + new ArrayList<>(metadataToHandle.values())); + return populateContainer(files); + } + case SEGMENT: { + List<SegmentMetadata> segments = + basicTablesRequests.segmentsMetadata( + popConfig.getMetadataHandlerContext().tableInfo(), + new ArrayList<>(metadataToHandle.values())); + return populateContainer(segments); + } + } + } + return outcome; + } + + private <T extends BaseMetadata & LocationProvider> IterOutcome populateContainer(List<T> metadata) { + VectorContainer populatedContainer; + if (firstBatch) { + populatedContainer = writeMetadata(metadata); + setupSchemaFromContainer(populatedContainer); + } else { + populatedContainer = writeMetadataUsingBatchSchema(metadata); + } + container.transferIn(populatedContainer); + container.setRecordCount(populatedContainer.getRecordCount()); + + if (firstBatch) { + firstBatch = false; + return IterOutcome.OK_NEW_SCHEMA; + } else { + return IterOutcome.OK; + } + } + + @SuppressWarnings("unchecked") + private <T extends BaseMetadata & LocationProvider> VectorContainer writeMetadata(List<T> metadataList) { + BaseMetadata firstElement = metadataList.iterator().next(); + + ResultSetLoader resultSetLoader = getResultSetLoaderForMetadata(firstElement); + resultSetLoader.startBatch(); + RowSetLoader rowWriter = resultSetLoader.writer(); + Iterator<T> segmentsIterator = metadataList.iterator(); + while (!rowWriter.isFull() && segmentsIterator.hasNext()) { + T metadata = segmentsIterator.next(); + metadataToHandle.remove(metadata.getMetadataInfo().identifier()); + + List<Object> arguments = new ArrayList<>(); + // adds required segment names to the arguments + arguments.add(metadata.getPath().toUri().getPath()); + Collections.addAll( + arguments, + Arrays.copyOf( + MetadataIdentifierUtils.getValuesFromMetadataIdentifier(metadata.getMetadataInfo().identifier()), + popConfig.getMetadataHandlerContext().segmentColumns().size())); + + // adds column statistics values assuming that they are sorted in alphabetic order + metadata.getColumnsStatistics().entrySet().stream() + .sorted(Comparator.comparing(e -> e.getKey().toExpr())) + .map(Map.Entry::getValue) + .flatMap(columnStatistics -> + AnalyzeColumnUtils.COLUMN_STATISTICS_FUNCTIONS.keySet().stream() + .map(columnStatistics::get)) + .forEach(arguments::add); + + AnalyzeColumnUtils.META_STATISTICS_FUNCTIONS.keySet().stream() + .map(metadata::getStatistic) + .forEach(arguments::add); + + // collectedMap field value + arguments.add(null); + + if (metadataType == MetadataType.SEGMENT) { + arguments.add(((SegmentMetadata) metadata).getLocations().stream() + .map(path -> path.toUri().getPath()) + .toArray(String[]::new)); + } + + if (metadataType == MetadataType.ROW_GROUP) { + arguments.add(Long.toString(((RowGroupMetadata) metadata).getRowGroupIndex())); Review comment: Thanks, replaced. But left in places like the line below since the wrong overloaded method was chosen and code failed with CCE. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
