jackylk commented on a change in pull request #3641: [CARBONDATA-3737] support prestodb and prestosql URL: https://github.com/apache/carbondata/pull/3641#discussion_r392239003
########## File path: integration/presto/src/main/prestosql/org/apache/carbondata/presto/impl/CarbonTableReader.java ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapFilter; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.converter.SchemaConverter; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.reader.ThriftReader; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.api.CarbonInputFormat; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.presto.PrestoFilterUtil; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import io.prestosql.plugin.hive.HiveColumnHandle; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.predicate.TupleDomain; +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.log4j.Logger; +import org.apache.thrift.TBase; + +import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY; +import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY; + +/** + * CarbonTableReader will be a facade of these utils + * 1:CarbonMetadata,(logic table) + * 2:FileFactory, (physic table file) + * 3:CarbonCommonFactory, (offer some ) + * 4:DictionaryFactory, (parse dictionary util) + * Currently, it is mainly used to parse metadata of tables under + * the configured carbondata-store path and filter the relevant + * input splits with given query predicates. + */ +public class CarbonTableReader { + + // default PathFilter, accepts files in carbondata format (with .carbondata extension). + private static final PathFilter DefaultFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return CarbonTablePath.isCarbonDataFile(path.getName()); + } + }; + public CarbonTableConfig config; + /** + * A cache for Carbon reader, with this cache, + * metadata of a table is only read from file system once. + */ + private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache; + + private String queryId; + + /** + * Logger instance + */ + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonTableReader.class.getName()); + + /** + * List Of Schemas + */ + private List<String> schemaNames = new ArrayList<>(); + + @Inject public CarbonTableReader(CarbonTableConfig config) { + this.config = Objects.requireNonNull(config, "CarbonTableConfig is null"); + this.carbonCache = new AtomicReference(new ConcurrentHashMap<>()); + populateCarbonProperties(); + } + + /** + * For presto worker node to initialize the metadata cache of a table. + * + * @param table the name of the table and schema. + * @return + */ + public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location, + Configuration config) { + updateSchemaTables(table, config); + CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table); + if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) { + return parseCarbonMetadata(table, location, config); + } + return carbonTableCacheModel; + } + + /** + * Find all the tables under the schema store path (this.carbonFileList) + * and cache all the table names in this.tableList. Notice that whenever this method + * is called, it clears this.tableList and populate the list by reading the files. + */ + private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) { + CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName); + if (carbonTableCacheModel != null && + carbonTableCacheModel.getCarbonTable().isTransactionalTable()) { + CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable(); + long latestTime = FileFactory.getCarbonFile(CarbonTablePath + .getSchemaFilePath( + carbonTable.getTablePath()), + config).getLastModifiedTime(); + carbonTableCacheModel.setCurrentSchemaTime(latestTime); + if (!carbonTableCacheModel.isValid()) { + // Invalidate datamaps + DataMapStoreManager.getInstance() + .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier()); + } + } + } + + /** + * Read the metadata of the given table + * and cache it in this.carbonCache (CarbonTableReader cache). + * + * @param table name of the given table. + * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table. + */ + private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath, + Configuration config) { + try { + CarbonTableCacheModel cache = getValidCacheBySchemaTableName(table); + if (cache != null) { + return cache; + } + // multiple tasks can be launched in a worker concurrently. Hence need to synchronize this. + synchronized (this) { + // cache might be filled by another thread, so if filled use that cache. + CarbonTableCacheModel cacheModel = getValidCacheBySchemaTableName(table); + if (cacheModel != null) { + return cacheModel; + } + // Step 1: get store path of the table and cache it. + String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath, config); + // If metadata folder exists, it is a transactional table + CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config); + boolean isTransactionalTable = schemaFile.exists(); + org.apache.carbondata.format.TableInfo tableInfo; + long modifiedTime = System.currentTimeMillis(); + if (isTransactionalTable) { + //Step 2: read the metadata (tableInfo) of the table. + ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() { + // TBase is used to read and write thrift objects. + // TableInfo is a kind of TBase used to read and write table information. + // TableInfo is generated by thrift, + // see schema.thrift under format/src/main/thrift for details. + public TBase create() { + return new org.apache.carbondata.format.TableInfo(); + } + }; + ThriftReader thriftReader = new ThriftReader(schemaFilePath, createTBase, config); + thriftReader.open(); + tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read(); + thriftReader.close(); + modifiedTime = schemaFile.getLastModifiedTime(); + } else { + tableInfo = CarbonUtil.inferSchema(tablePath, table.getTableName(), false, config); + } + // Step 3: convert format level TableInfo to code level TableInfo + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + // wrapperTableInfo is the code level information of a table in carbondata core, + // different from the Thrift TableInfo. + TableInfo wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(), + tablePath); + wrapperTableInfo.setTransactionalTable(isTransactionalTable); + CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName()); + // Step 4: Load metadata info into CarbonMetadata + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo); + CarbonTable carbonTable = Objects.requireNonNull(CarbonMetadata.getInstance() + .getCarbonTable(table.getSchemaName(), table.getTableName()), "carbontable is null"); + cache = new CarbonTableCacheModel(modifiedTime, carbonTable); + // cache the table + carbonCache.get().put(table, cache); + cache.setCarbonTable(carbonTable); + } + return cache; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private CarbonTableCacheModel getValidCacheBySchemaTableName(SchemaTableName schemaTableName) { + CarbonTableCacheModel cache = carbonCache.get().get(schemaTableName); + if (cache != null && cache.isValid()) { + return cache; + } + return null; + } + + public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, + Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config) + throws IOException { + List<CarbonLocalInputSplit> result = new ArrayList<>(); + List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>(); + CarbonTable carbonTable = tableCacheModel.getCarbonTable(); + TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo(); + config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, ""); + String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath(); + config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath); + config.set(CarbonTableInputFormat.DATABASE_NAME, carbonTable.getDatabaseName()); + config.set(CarbonTableInputFormat.TABLE_NAME, carbonTable.getTableName()); + config.set("query.id", queryId); + CarbonInputFormat.setTransactionalTable(config, carbonTable.isTransactionalTable()); + CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo()); + + JobConf jobConf = new JobConf(config); + List<PartitionSpec> filteredPartitions = new ArrayList<>(); + + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(); + LoadMetadataDetails[] loadMetadataDetails = null; + if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) { + try { + loadMetadataDetails = SegmentStatusManager.readTableStatusFile( + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath())); + } catch (IOException e) { + LOGGER.error(e.getMessage(), e); + throw e; + } + filteredPartitions = findRequiredPartitions(constraints, carbonTable, loadMetadataDetails); + } + try { + CarbonTableInputFormat.setTableInfo(config, tableInfo); + CarbonTableInputFormat carbonTableInputFormat = + createInputFormat(jobConf, carbonTable.getAbsoluteTableIdentifier(), + new DataMapFilter(carbonTable, filters, true), filteredPartitions); + Job job = Job.getInstance(jobConf); + List<InputSplit> splits = carbonTableInputFormat.getSplits(job); + Gson gson = new Gson(); + if (splits != null && splits.size() > 0) { + for (InputSplit inputSplit : splits) { + CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit; + result.add(new CarbonLocalInputSplit(carbonInputSplit.getSegmentId(), + carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), + carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()), + carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(), + carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(), + gson.toJson(carbonInputSplit.getDetailInfo()), + carbonInputSplit.getFileFormat().ordinal())); + } + + // Use block distribution + List<List<CarbonLocalInputSplit>> inputSplits = new ArrayList( + result.stream().map(x -> (CarbonLocalInputSplit) x).collect(Collectors.groupingBy( + carbonInput -> { + if (FileFormat.ROW_V1.equals(carbonInput.getFileFormat())) { + return carbonInput.getSegmentId().concat(carbonInput.getPath()) + .concat(carbonInput.getStart() + ""); + } + return carbonInput.getSegmentId().concat(carbonInput.getPath()); + })).values()); + if (inputSplits != null) { + for (int j = 0; j < inputSplits.size(); j++) { + multiBlockSplitList.add(new CarbonLocalMultiBlockSplit(inputSplits.get(j), + inputSplits.get(j).stream().flatMap(f -> Arrays.stream(getLocations(f))).distinct() Review comment: Is there more efficient way in case of millions of splits ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
