akashrn5 commented on a change in pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#discussion_r491866753
##########
File path:
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2525,4 +2525,9 @@ private CarbonCommonConstants() {
* property which defines the presto query default value
*/
public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
+
+ /**
+ * property to send load model from coordinator to worker in presto
+ */
+ public static final String CARBON_PRESTO_LOAD_MODEL =
"presto.carbondata.encoded.loadmodel";
Review comment:
its not configurable by user. You are right. I have moved the constant
to `CarbonTableConfig` in presto module and renamed it to
`carbondata.presto.encoded.loadmodel`
##########
File path:
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2525,4 +2525,9 @@ private CarbonCommonConstants() {
* property which defines the presto query default value
*/
public static final String IS_QUERY_FROM_PRESTO_DEFAULT = "false";
+
+ /**
+ * property to send load model from coordinator to worker in presto
Review comment:
not a user config, same as above comment. Moved to presto module.
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+ private final JobConf configuration;
+ private Path outPutPath;
+ private final FileSinkOperator.RecordWriter recordWriter;
+ private final CarbonHiveSerDe serDe;
+ private final int fieldCount;
+ private final Object row;
+ private final SettableStructObjectInspector tableInspector;
+ private final List<StructField> structFields;
+ private final HiveWriteUtils.FieldSetter[] setters;
+
+ private boolean isCommitDone;
+
+ public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames,
Properties properties,
+ JobConf configuration, TypeManager typeManager) throws SerDeException {
+ this.outPutPath = requireNonNull(outPutPath, "path is null");
+ this.outPutPath = new Path(properties.getProperty("location"));
+ outPutPath = new Path(properties.getProperty("location"));
+ this.configuration = requireNonNull(configuration, "conf is null");
+ List<String> columnNames = Arrays
+ .asList(properties.getProperty(IOConstants.COLUMNS,
"").split(CarbonCommonConstants.COMMA));
+ List<Type> fileColumnTypes =
+ HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES,
"")).stream()
+ .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+ fieldCount = columnNames.size();
+ serDe = new CarbonHiveSerDe();
+ serDe.initialize(configuration, properties);
+ tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+ structFields = ImmutableList.copyOf(
+ inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+ .collect(toImmutableList()));
+
+ row = tableInspector.create();
+
+ setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+ for (int i = 0; i < setters.length; i++) {
+ setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row,
structFields.get(i),
+ fileColumnTypes.get(structFields.get(i).getFieldID()));
+ }
+ String encodedLoadModel =
configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+ if (StringUtils.isNotEmpty(encodedLoadModel)) {
Review comment:
`encodedLoadModel` value will be a string, basically the serialized load
model, we use this property just to set the encoded load model prepared in
`setUpJob()` to conf, so that it will be transferred to all the workers from
coordinator.
##########
File path:
integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
##########
@@ -40,6 +40,12 @@
private String endPoint;
private String pushRowFilter;
+ /**
+ * Property to send load model from coordinator to worker in presto. This is
internal constant
+ * and not exposed to user.
Review comment:
As said in the above comment, its same we use this as property name to
send the load model from coordinator to worker. So its value will be the load
model prepared for each load.
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveLocationService;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.plugin.hive.LocationHandle;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.spi.connector.ConnectorSession;
+import org.apache.hadoop.fs.Path;
+
+public class CarbonDataLocationService extends HiveLocationService {
+
+ private final HdfsEnvironment hdfsEnvironment;
+
+ @Inject
+ public CarbonDataLocationService(HdfsEnvironment hdfsEnvironment) {
+ super(hdfsEnvironment);
+ this.hdfsEnvironment = hdfsEnvironment;
+ }
+
+ @Override
+ public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore,
+ ConnectorSession session, String schemaName, String tableName) {
+ // TODO: check and make it compatible for cloud scenario
Review comment:
Actually if we don't override these methods, the presto gives write path
as temp path for each writer like we use the temp path in carbon during
writing. But this will conflict with our writing part. So i have overridden and
given write path and target path as same. In presto super class, for s3 or any
encrypted stores, they do not create the temp write path or the staging path.
So here basically once we need to test in S3 or OBS and then remove this todo,
if it works fine. That is why i added a to do here. Since i didn't have S3/OBS
test couldn't do it. I tested in HDFS.
You can refer
https://github.com/prestosql/presto/blob/8b177120661e600b5595b18826f5c415b7824b81/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java#L55
https://github.com/prestosql/presto/blob/8b177120661e600b5595b18826f5c415b7824b81/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveLocationService.java#L76
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.MetastoreUtil;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.plugin.hive.security.AccessControlMetadata;
+import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
+import io.prestosql.plugin.hive.util.ConfigurationUtils;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorOutputMetadata;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.statistics.ComputedStatistics;
+import io.prestosql.spi.type.TypeManager;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+
+public class CarbonDataMetaData extends HiveMetadata {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataMetaData.class.getName());
+
+ private HdfsEnvironment hdfsEnvironment;
+ private SemiTransactionalHiveMetastore metastore;
+ private MapredCarbonOutputCommitter carbonOutputCommitter;
+ private JobContextImpl jobContext;
+
+ public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
+ HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
DateTimeZone timeZone,
+ boolean allowCorruptWritesForTesting, boolean
writesToNonManagedTablesEnabled,
+ boolean createsOfNonManagedTablesEnabled, TypeManager typeManager,
+ LocationService locationService,
+ io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec,
+ TypeTranslator typeTranslator, String prestoVersion,
+ HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata
accessControlMetadata) {
+ super(metastore, hdfsEnvironment, partitionManager, timeZone,
allowCorruptWritesForTesting,
+ true, createsOfNonManagedTablesEnabled, typeManager,
+ locationService, partitionUpdateCodec, typeTranslator, prestoVersion,
+ hiveStatisticsProvider, accessControlMetadata);
+ this.hdfsEnvironment = hdfsEnvironment;
+ this.metastore = metastore;
+ }
+
+ @Override
+ public CarbonDataInsertTableHandle beginInsert(ConnectorSession session,
+ ConnectorTableHandle tableHandle) {
+ HiveInsertTableHandle hiveInsertTableHandle = super.beginInsert(session,
tableHandle);
+ SchemaTableName tableName = hiveInsertTableHandle.getSchemaTableName();
+ Optional<Table> table =
+ this.metastore.getTable(tableName.getSchemaName(),
tableName.getTableName());
+ Path outputPath =
+ new
Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableTargetPath());
+ JobConf jobConf = ConfigurationUtils.toJobConf(this.hdfsEnvironment
+ .getConfiguration(
+ new HdfsEnvironment.HdfsContext(session,
hiveInsertTableHandle.getSchemaName(),
+ hiveInsertTableHandle.getTableName()),
+ new
Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableWritePath())));
+ jobConf.set("location", outputPath.toString());
+ Properties hiveSchema = MetastoreUtil.getHiveSchema(table.get());
+ try {
+ CarbonLoadModel carbonLoadModel =
+ HiveCarbonUtil.getCarbonLoadModel(hiveSchema, jobConf);
+
+ CarbonTableOutputFormat.setLoadModel(jobConf, carbonLoadModel);
+ } catch (IOException ex) {
+ LOG.error("Error while creating carbon load model", ex);
+ throw new RuntimeException(ex);
+ }
+ try {
+ carbonOutputCommitter = new MapredCarbonOutputCommitter();
+ jobContext = new JobContextImpl(jobConf, new JobID());
+ carbonOutputCommitter.setupJob(jobContext);
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobConf);
+ } catch (IOException e) {
+ LOG.error("error setting the output committer", e);
+ throw new RuntimeException("error setting the output committer");
+ }
+ return new
CarbonDataInsertTableHandle(hiveInsertTableHandle.getSchemaTableName().getSchemaName(),
+ hiveInsertTableHandle.getTableName(),
+ hiveInsertTableHandle.getInputColumns(),
+ hiveInsertTableHandle.getPageSinkMetadata(),
+ hiveInsertTableHandle.getLocationHandle(),
+ hiveInsertTableHandle.getBucketProperty(),
hiveInsertTableHandle.getTableStorageFormat(),
+ hiveInsertTableHandle.getPartitionStorageFormat(),
+ ImmutableMap.of(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL,
+
jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL)));
+ }
+
+ @Override
+ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession
session,
+ ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments,
+ Collection<ComputedStatistics> computedStatistics) {
+ Optional<ConnectorOutputMetadata> connectorOutputMetadata =
+ super.finishInsert(session, insertHandle, fragments,
computedStatistics);
+ try {
+ carbonOutputCommitter.commitJob(jobContext);
+ } catch (IOException e) {
+ LOG.error("Error occurred while committing the insert job.", e);
+ throw new RuntimeException(e);
Review comment:
Actually super.finishInsert if you see, it doesnt do much for our
transactional case. Carbon reading understands the our commit job, basically
the status of table status. So here its fine. Also there was another problem
which i faced , but since developed almost some months back, i couldn't exactly
remember the issue. But with respect to carbon its fine.
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.MetastoreUtil;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.plugin.hive.security.AccessControlMetadata;
+import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
+import io.prestosql.plugin.hive.util.ConfigurationUtils;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorOutputMetadata;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.statistics.ComputedStatistics;
+import io.prestosql.spi.type.TypeManager;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+
+public class CarbonDataMetaData extends HiveMetadata {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataMetaData.class.getName());
+
+ private HdfsEnvironment hdfsEnvironment;
+ private SemiTransactionalHiveMetastore metastore;
+ private MapredCarbonOutputCommitter carbonOutputCommitter;
+ private JobContextImpl jobContext;
+
+ public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
+ HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
DateTimeZone timeZone,
+ boolean allowCorruptWritesForTesting, boolean
writesToNonManagedTablesEnabled,
Review comment:
Actually Super class that those many in its constructor, so followed
same , and its being called from one place, so it should be fine?
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.Inject;
+import io.airlift.event.client.EventClient;
+import io.airlift.json.JsonCodec;
+import io.airlift.units.DataSize;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.HivePageSink;
+import io.prestosql.plugin.hive.HivePageSinkProvider;
+import io.prestosql.plugin.hive.HiveSessionProperties;
+import io.prestosql.plugin.hive.HiveWritableTableHandle;
+import io.prestosql.plugin.hive.HiveWriterStats;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.OrcFileWriterFactory;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider;
+import io.prestosql.plugin.hive.metastore.SortingColumn;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.PageIndexerFactory;
+import io.prestosql.spi.PageSorter;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorPageSink;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.type.TypeManager;
+
+import static
com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+import static io.airlift.concurrent.Threads.daemonThreadsNamed;
+import static
io.prestosql.plugin.hive.metastore.CachingHiveMetastore.memoizeMetastore;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+public class CarbonDataPageSinkProvider extends HivePageSinkProvider {
+
+ private final Set<HiveFileWriterFactory> fileWriterFactories;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final PageSorter pageSorter;
+ private final HiveMetastore metastore;
+ private final PageIndexerFactory pageIndexerFactory;
+ private final TypeManager typeManager;
+ private final int maxOpenPartitions;
+ private final int maxOpenSortFiles;
+ private final DataSize writerSortBufferSize;
+ private final boolean immutablePartitions;
+ private final LocationService locationService;
+ private final ListeningExecutorService writeVerificationExecutor;
+ private final JsonCodec<PartitionUpdate> partitionUpdateCodec;
+ private final NodeManager nodeManager;
+ private final EventClient eventClient;
+ private final HiveSessionProperties hiveSessionProperties;
+ private final HiveWriterStats hiveWriterStats;
+ private final OrcFileWriterFactory orcFileWriterFactory;
+ private final long perTransactionMetastoreCacheMaximumSize;
+
+ @Inject
+ public CarbonDataPageSinkProvider(Set<HiveFileWriterFactory>
fileWriterFactories,
+ HdfsEnvironment hdfsEnvironment, PageSorter pageSorter, HiveMetastore
metastore,
Review comment:
Actually Super class that those many in its constructor, so followed
same , and its being called from one place, so it should be fine? and this is
Inject framework too.
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import com.google.inject.Inject;
+import io.airlift.concurrent.BoundedExecutor;
+import io.airlift.json.JsonCodec;
+import io.airlift.log.Logger;
+import io.prestosql.plugin.hive.ForHive;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HiveMetadataFactory;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.CachingHiveMetastore;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.security.AccessControlMetadataFactory;
+import io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider;
+import io.prestosql.spi.type.TypeManager;
+import org.joda.time.DateTimeZone;
+
+public class CarbonMetadataFactory extends HiveMetadataFactory {
+
+ private static final Logger log = Logger.get(HiveMetadataFactory.class);
+ private final boolean allowCorruptWritesForTesting;
+ private final boolean skipDeletionForAlter;
+ private final boolean skipTargetCleanupOnRollback;
+ private final boolean writesToNonManagedTablesEnabled = true;
+ private final boolean createsOfNonManagedTablesEnabled;
+ private final long perTransactionCacheMaximumSize;
+ private final HiveMetastore metastore;
+ private final HdfsEnvironment hdfsEnvironment;
+ private final HivePartitionManager partitionManager;
+ private final DateTimeZone timeZone;
+ private final TypeManager typeManager;
+ private final LocationService locationService;
+ private final BoundedExecutor renameExecution;
+ private final TypeTranslator typeTranslator;
+ private final String prestoVersion;
+ private final AccessControlMetadataFactory accessControlMetadataFactory;
+ private final JsonCodec partitionUpdateCodec;
+
+ @Inject public CarbonMetadataFactory(HiveConfig hiveConfig, HiveMetastore
metastore,
+ HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
+ @ForHive ExecutorService executorService, TypeManager typeManager,
+ LocationService locationService, JsonCodec<PartitionUpdate>
partitionUpdateCodec,
+ TypeTranslator typeTranslator, NodeVersion nodeVersion,
+ AccessControlMetadataFactory accessControlMetadataFactory) {
+ this(metastore, hdfsEnvironment, partitionManager,
hiveConfig.getDateTimeZone(),
+ hiveConfig.getMaxConcurrentFileRenames(),
hiveConfig.getAllowCorruptWritesForTesting(),
+ hiveConfig.isSkipDeletionForAlter(),
hiveConfig.isSkipTargetCleanupOnRollback(),
+ hiveConfig.getWritesToNonManagedTablesEnabled(),
+ hiveConfig.getCreatesOfNonManagedTablesEnabled(),
+ hiveConfig.getPerTransactionMetastoreCacheMaximumSize(), typeManager,
locationService,
+ partitionUpdateCodec, executorService, typeTranslator,
nodeVersion.toString(),
+ accessControlMetadataFactory);
+ }
+
+ public CarbonMetadataFactory(HiveMetastore metastore, HdfsEnvironment
hdfsEnvironment,
+ HivePartitionManager partitionManager, DateTimeZone timeZone, int
maxConcurrentFileRenames,
+ boolean allowCorruptWritesForTesting, boolean skipDeletionForAlter,
Review comment:
Actually Super class that those many in its constructor, so followed
same , and its being called from one place, so it should be fine? and this is
Inject framework too.
##########
File path:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
##########
@@ -76,7 +76,7 @@
// TODO Move dictionary generator which is coded in spark to MR framework.
public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
ObjectArrayWritable> {
- protected static final String LOAD_MODEL =
"mapreduce.carbontable.load.model";
+ public static final String LOAD_MODEL = "mapreduce.carbontable.load.model";
Review comment:
done
##########
File path:
integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
##########
@@ -52,25 +53,30 @@
@Override
public void setupJob(JobContext jobContext) throws IOException {
-
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
- String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
Random random = new Random();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context =
new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID);
- CarbonLoadModel carbonLoadModel =
- HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
- CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(),
carbonLoadModel);
+ CarbonLoadModel carbonLoadModel = null;
+ String encodedString =
jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL);
+ if (encodedString != null) {
Review comment:
actually its base code refactoring, added comment. @kunal642 please
check if the comment is proper or do i need to modify?
##########
File path:
integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
##########
@@ -52,25 +53,30 @@
@Override
public void setupJob(JobContext jobContext) throws IOException {
-
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
- String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
Random random = new Random();
JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context =
new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID);
- CarbonLoadModel carbonLoadModel =
- HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
- CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(),
carbonLoadModel);
+ CarbonLoadModel carbonLoadModel = null;
+ String encodedString =
jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL);
+ if (encodedString != null) {
+ carbonLoadModel =
+ (CarbonLoadModel)
ObjectSerializationUtil.convertStringToObject(encodedString);
+ }
+ if (null == carbonLoadModel) {
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
+ String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
+ carbonLoadModel =
HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
+ CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(),
carbonLoadModel);
+ String loadModelStr =
jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL);
+ jobContext.getJobConf().set(JobConf.MAPRED_MAP_TASK_ENV, a + ",carbon="
+ loadModelStr);
Review comment:
added comment for the base code. @kunal642 please check whether the
comment is proper or not.
##########
File path:
integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
##########
@@ -92,6 +95,11 @@ public void checkOutputSpecs(FileSystem fileSystem, JobConf
jobConf) throws IOEx
}
String tablePath =
FileFactory.getCarbonFile(carbonLoadModel.getTablePath()).getAbsolutePath();
TaskAttemptID taskAttemptID =
TaskAttemptID.forName(jc.get("mapred.task.id"));
+ if (taskAttemptID == null) {
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
Review comment:
done
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the age data
Review comment:
its page data. changed
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+ private final JobConf configuration;
+ private Path outPutPath;
Review comment:
done
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+ private final JobConf configuration;
+ private Path outPutPath;
+ private final FileSinkOperator.RecordWriter recordWriter;
+ private final CarbonHiveSerDe serDe;
+ private final int fieldCount;
+ private final Object row;
+ private final SettableStructObjectInspector tableInspector;
+ private final List<StructField> structFields;
+ private final HiveWriteUtils.FieldSetter[] setters;
+
+ private boolean isCommitDone;
+
+ public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames,
Properties properties,
+ JobConf configuration, TypeManager typeManager) throws SerDeException {
+ this.outPutPath = requireNonNull(outPutPath, "path is null");
+ this.outPutPath = new Path(properties.getProperty("location"));
+ outPutPath = new Path(properties.getProperty("location"));
Review comment:
done
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+ private final JobConf configuration;
+ private Path outPutPath;
+ private final FileSinkOperator.RecordWriter recordWriter;
+ private final CarbonHiveSerDe serDe;
+ private final int fieldCount;
+ private final Object row;
+ private final SettableStructObjectInspector tableInspector;
+ private final List<StructField> structFields;
+ private final HiveWriteUtils.FieldSetter[] setters;
+
+ private boolean isCommitDone;
+
+ public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames,
Properties properties,
+ JobConf configuration, TypeManager typeManager) throws SerDeException {
+ this.outPutPath = requireNonNull(outPutPath, "path is null");
+ this.outPutPath = new Path(properties.getProperty("location"));
+ outPutPath = new Path(properties.getProperty("location"));
+ this.configuration = requireNonNull(configuration, "conf is null");
+ List<String> columnNames = Arrays
+ .asList(properties.getProperty(IOConstants.COLUMNS,
"").split(CarbonCommonConstants.COMMA));
+ List<Type> fileColumnTypes =
+ HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES,
"")).stream()
+ .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+ fieldCount = columnNames.size();
Review comment:
done
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+ private final JobConf configuration;
+ private Path outPutPath;
+ private final FileSinkOperator.RecordWriter recordWriter;
+ private final CarbonHiveSerDe serDe;
+ private final int fieldCount;
+ private final Object row;
+ private final SettableStructObjectInspector tableInspector;
+ private final List<StructField> structFields;
+ private final HiveWriteUtils.FieldSetter[] setters;
+
+ private boolean isCommitDone;
+
+ public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames,
Properties properties,
+ JobConf configuration, TypeManager typeManager) throws SerDeException {
+ this.outPutPath = requireNonNull(outPutPath, "path is null");
+ this.outPutPath = new Path(properties.getProperty("location"));
+ outPutPath = new Path(properties.getProperty("location"));
+ this.configuration = requireNonNull(configuration, "conf is null");
+ List<String> columnNames = Arrays
+ .asList(properties.getProperty(IOConstants.COLUMNS,
"").split(CarbonCommonConstants.COMMA));
+ List<Type> fileColumnTypes =
+ HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES,
"")).stream()
+ .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+ fieldCount = columnNames.size();
+ serDe = new CarbonHiveSerDe();
+ serDe.initialize(configuration, properties);
+ tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+ structFields = ImmutableList.copyOf(
Review comment:
done
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+ private final JobConf configuration;
+ private Path outPutPath;
+ private final FileSinkOperator.RecordWriter recordWriter;
+ private final CarbonHiveSerDe serDe;
+ private final int fieldCount;
+ private final Object row;
+ private final SettableStructObjectInspector tableInspector;
+ private final List<StructField> structFields;
+ private final HiveWriteUtils.FieldSetter[] setters;
+
+ private boolean isCommitDone;
+
+ public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames,
Properties properties,
+ JobConf configuration, TypeManager typeManager) throws SerDeException {
+ this.outPutPath = requireNonNull(outPutPath, "path is null");
+ this.outPutPath = new Path(properties.getProperty("location"));
+ outPutPath = new Path(properties.getProperty("location"));
+ this.configuration = requireNonNull(configuration, "conf is null");
+ List<String> columnNames = Arrays
+ .asList(properties.getProperty(IOConstants.COLUMNS,
"").split(CarbonCommonConstants.COMMA));
+ List<Type> fileColumnTypes =
+ HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES,
"")).stream()
+ .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+ fieldCount = columnNames.size();
+ serDe = new CarbonHiveSerDe();
+ serDe.initialize(configuration, properties);
+ tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+ structFields = ImmutableList.copyOf(
+ inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+ .collect(toImmutableList()));
+
+ row = tableInspector.create();
+
+ setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+ for (int i = 0; i < setters.length; i++) {
+ setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row,
structFields.get(i),
+ fileColumnTypes.get(structFields.get(i).getFieldID()));
+ }
+ String encodedLoadModel =
configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+ if (StringUtils.isNotEmpty(encodedLoadModel)) {
+ configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel);
+ }
+ try {
+ boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT);
+ Object writer =
+
Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+ recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+ .getHiveRecordWriter(this.configuration, outPutPath, Text.class,
compress,
+ properties, Reporter.NULL);
+ } catch (Exception e) {
+ LOG.error("error while initializing writer", e);
+ throw new RuntimeException("writer class not found");
+ }
+ }
+
+ @Override public long getWrittenBytes() {
+ if (isCommitDone) {
+ try {
+ return
outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return 0;
+ }
+
+ @Override public long getSystemMemoryUsage() {
+ return 0;
+ }
+
+ @Override public void appendRows(Page dataPage) {
+ for (int position = 0; position < dataPage.getPositionCount(); position++)
{
+ appendRow(dataPage, position);
+ }
+ }
+
+ public void appendRow(Page dataPage, int position) {
Review comment:
changed
##########
File path:
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+ private static final Logger LOG =
+ LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+ private final JobConf configuration;
+ private Path outPutPath;
+ private final FileSinkOperator.RecordWriter recordWriter;
+ private final CarbonHiveSerDe serDe;
+ private final int fieldCount;
+ private final Object row;
+ private final SettableStructObjectInspector tableInspector;
+ private final List<StructField> structFields;
+ private final HiveWriteUtils.FieldSetter[] setters;
+
+ private boolean isCommitDone;
+
+ public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames,
Properties properties,
+ JobConf configuration, TypeManager typeManager) throws SerDeException {
+ this.outPutPath = requireNonNull(outPutPath, "path is null");
+ this.outPutPath = new Path(properties.getProperty("location"));
+ outPutPath = new Path(properties.getProperty("location"));
+ this.configuration = requireNonNull(configuration, "conf is null");
+ List<String> columnNames = Arrays
+ .asList(properties.getProperty(IOConstants.COLUMNS,
"").split(CarbonCommonConstants.COMMA));
+ List<Type> fileColumnTypes =
+ HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES,
"")).stream()
+ .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+ fieldCount = columnNames.size();
+ serDe = new CarbonHiveSerDe();
+ serDe.initialize(configuration, properties);
+ tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+ structFields = ImmutableList.copyOf(
+ inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+ .collect(toImmutableList()));
+
+ row = tableInspector.create();
+
+ setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+ for (int i = 0; i < setters.length; i++) {
+ setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row,
structFields.get(i),
+ fileColumnTypes.get(structFields.get(i).getFieldID()));
+ }
+ String encodedLoadModel =
configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+ if (StringUtils.isNotEmpty(encodedLoadModel)) {
+ configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel);
+ }
+ try {
+ boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT);
+ Object writer =
+
Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+ recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+ .getHiveRecordWriter(this.configuration, outPutPath, Text.class,
compress,
+ properties, Reporter.NULL);
+ } catch (Exception e) {
+ LOG.error("error while initializing writer", e);
+ throw new RuntimeException("writer class not found");
+ }
+ }
+
+ @Override public long getWrittenBytes() {
+ if (isCommitDone) {
+ try {
+ return
outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return 0;
+ }
+
+ @Override public long getSystemMemoryUsage() {
+ return 0;
+ }
+
+ @Override public void appendRows(Page dataPage) {
+ for (int position = 0; position < dataPage.getPositionCount(); position++)
{
+ appendRow(dataPage, position);
+ }
+ }
+
+ public void appendRow(Page dataPage, int position) {
+ for (int field = 0; field < fieldCount; field++) {
+ Block block = dataPage.getBlock(field);
+ if (block.isNull(position)) {
+ tableInspector.setStructFieldData(row, structFields.get(field), null);
+ } else {
+ setters[field].setField(block, position);
+ }
+ }
+
+ try {
+ recordWriter.write(serDe.serialize(row, tableInspector));
+ } catch (SerDeException | IOException e) {
+ throw new PrestoException(HIVE_WRITER_DATA_ERROR, e);
+ }
+ }
+
+ @Override public void commit() {
+ try {
+ recordWriter.close(false);
+ } catch (Exception ex) {
+ LOG.error("Error while closing the record writer", ex);
+ throw new RuntimeException(ex);
+ }
+ isCommitDone = true;
+ }
+
+ @Override public void rollback() {
+
Review comment:
modified
##########
File path:
integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.integrationtest
+
+import java.io.File
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
CarbonTableIdentifier}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.presto.server.PrestoServer
+import org.apache.carbondata.presto.util.CarbonDataStoreCreator
+
+class PrestoInsertIntoTableTestCase extends FunSuiteLike with
BeforeAndAfterAll with BeforeAndAfterEach {
+
+ private val logger = LogServiceFactory
+ .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName)
+
+ private val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ private val storePath = s"$rootPath/integration/presto/target/store"
+ private val systemPath = s"$rootPath/integration/presto/target/system"
+ private val prestoServer = new PrestoServer
+
+ override def beforeAll: Unit = {
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+ "Presto")
+ val map = new util.HashMap[String, String]()
+ map.put("hive.metastore", "file")
+ map.put("hive.metastore.catalog.dir", s"file://$storePath")
+ map.put("hive.allow-drop-table", "true")
+ prestoServer.startServer("testdb", map)
+ prestoServer.execute("drop schema if exists testdb")
+ prestoServer.execute("create schema testdb")
+ }
+
+ override protected def beforeEach(): Unit = {
+ val query = "create table testdb.testtable(ID int, date date, country
varchar, name varchar, phonetype varchar, serialname varchar,salary
decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp,
shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') "
+ createTable(query, "testdb", "testtable")
+ }
+
+ private def createTable(query: String, databaseName: String, tableName:
String): Unit = {
+ prestoServer.execute(s"drop table if exists ${databaseName}.${tableName}")
+ prestoServer.execute(query)
+ logger.info("Creating The Carbon Store")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier(databaseName, tableName)
+ CarbonDataStoreCreator.createTable(absoluteTableIdentifier, true)
+ logger.info(s"\nCarbon store is created at location: $storePath")
+ }
+
+ private def getAbsoluteIdentifier(dbName: String,
+ tableName: String) = {
+ val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+ storePath + "/" + dbName + "/" + tableName,
+ new CarbonTableIdentifier(dbName,
+ tableName,
+ UUID.randomUUID().toString))
+ absoluteTableIdentifier
+ }
+
+ test("test insert with different storage format names") {
+ val query1 = "create table testdb.testtable(ID int, date date, country
varchar, name varchar, phonetype varchar, serialname varchar,salary
decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp,
shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') "
+ val query2 = "create table testdb.testtable(ID int, date date, country
varchar, name varchar, phonetype varchar, serialname varchar,salary
decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp,
shortField smallint, iscurrentemployee boolean) with(format='CARBON') "
+ val query3 = "create table testdb.testtable(ID int, date date, country
varchar, name varchar, phonetype varchar, serialname varchar,salary
decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp,
shortField smallint, iscurrentemployee boolean)
with(format='ORG.APACHE.CARBONDATA.FORMAT') "
+ createTable(query1, "testdb", "testtable")
+ createTable(query2, "testdb", "testtable")
+ createTable(query3, "testdb", "testtable")
+ prestoServer.execute("insert into testdb.testtable values(10,
current_date, 'INDIA', 'Chandler', 'qwerty',
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint
'23', true)")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier("testdb", "testtable")
+ val carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+ val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath,
"0")
+ assert(FileFactory.getCarbonFile(segmentPath).isFileExist)
+ }
+
+ test("test insert into one segment and check folder structure") {
+ prestoServer.execute("insert into testdb.testtable values(10,
current_date, 'INDIA', 'Chandler', 'qwerty',
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint
'23', true)")
+ prestoServer.execute("insert into testdb.testtable values(10,
current_date, 'INDIA', 'Chandler', 'qwerty',
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint
'23', true)")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier("testdb", "testtable")
+ val carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+ val tablePath = carbonTable.getTablePath
+ val segment0Path = CarbonTablePath.getSegmentPath(tablePath, "0")
+ val segment1Path = CarbonTablePath.getSegmentPath(tablePath, "1")
+ val segment0 = FileFactory.getCarbonFile(segment0Path)
+ assert(segment0.isFileExist)
+ assert(segment0.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) ||
+ file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+ }
+ }).length == 2)
+ val segment1 = FileFactory.getCarbonFile(segment1Path)
+ assert(segment1.isFileExist)
+ assert(segment1.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) ||
+ file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+ }
+ }).length == 2)
+ val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath)
+ assert(FileFactory.getCarbonFile(segmentsPath).isFileExist &&
FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2)
+ val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath)
+ FileFactory.getCarbonFile(metadataFolderPath).listFiles(new
CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.TABLE_STATUS_FILE)
+ }
+ })
+ }
+
+ test("test insert into many segments and check segment count and data
count") {
+ prestoServer.execute("insert into testdb.testtable values(10,
current_date, 'INDIA', 'Chandler', 'qwerty',
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint
'23', true)")
+ prestoServer.execute("insert into testdb.testtable values(10,
current_date, 'INDIA', 'Chandler', 'qwerty',
'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint
'23', true)")
+ prestoServer.execute("insert into testdb.testtable values(10,
current_date, 'INDIA', 'Chandler', 'qwerty',
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint
'23', true)")
+ prestoServer.execute("insert into testdb.testtable values(10,
current_date, 'INDIA', 'Chandler', 'qwerty',
'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint
'23', true)")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier("testdb", "testtable")
+ val carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+ val segmentFoldersLocation =
CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
+
assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size()
== 8)
+ val actualResult1: List[Map[String, Any]] = prestoServer
+ .executeQuery("select count(*) AS RESULT from testdb.testtable")
+ val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 4))
+ assert(actualResult1.equals(expectedResult1))
+ // filter query
+ val actualResult2: List[Map[String, Any]] = prestoServer
+ .executeQuery("select count(*) AS RESULT from testdb.testtable WHERE dob
= timestamp '1998-12-16 10:12:09'")
+ val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 2))
+ assert(actualResult2.equals(expectedResult2))
+ }
+
+ test("test if the table status contains the segment file name for each
load") {
+ prestoServer.execute("insert into testdb.testtable values(10,
current_date, 'INDIA', 'Chandler', 'qwerty',
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint
'23', true)")
+ val absoluteTableIdentifier: AbsoluteTableIdentifier =
getAbsoluteIdentifier("testdb", "testtable")
+ val carbonTable =
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+ val ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+ ssm.getValidAndInvalidSegments.getValidSegments.asScala.foreach { segment
=>
+ val loadMetadataDetails = segment.getLoadMetadataDetails
+ assert(loadMetadataDetails.getSegmentFile != null)
+ }
+ }
+
Review comment:
added
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]