Repository: sqoop Updated Branches: refs/heads/trunk 17461e91d -> 739bbce48
SQOOP-3329: Remove Kite dependency from the Sqoop project (Szabolcs Vasas via Boglarka Egyed) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/739bbce4 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/739bbce4 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/739bbce4 Branch: refs/heads/trunk Commit: 739bbce48593a82575435f1cc48ca7ebd48537c9 Parents: 17461e9 Author: Boglarka Egyed <[email protected]> Authored: Fri Jul 20 09:36:39 2018 +0200 Committer: Boglarka Egyed <[email protected]> Committed: Fri Jul 20 09:36:39 2018 +0200 ---------------------------------------------------------------------- ivy.xml | 10 +- ivy/libraries.properties | 3 +- src/docs/user/hive-notes.txt | 8 - src/docs/user/import.txt | 39 +--- src/java/org/apache/sqoop/SqoopOptions.java | 4 +- .../ParquetJobConfiguratorImplementation.java | 3 +- .../parquet/kite/KiteMergeParquetReducer.java | 36 --- .../kite/KiteParquetExportJobConfigurator.java | 51 ----- .../parquet/kite/KiteParquetExportMapper.java | 37 ---- .../kite/KiteParquetImportJobConfigurator.java | 98 --------- .../parquet/kite/KiteParquetImportMapper.java | 55 ----- .../kite/KiteParquetJobConfiguratorFactory.java | 45 ---- .../kite/KiteParquetMergeJobConfigurator.java | 103 --------- .../parquet/kite/KiteParquetUtils.java | 217 ------------------- .../org/apache/sqoop/tool/BaseSqoopTool.java | 10 - src/test/org/apache/sqoop/TestMerge.java | 15 +- .../org/apache/sqoop/TestParquetExport.java | 47 ---- .../org/apache/sqoop/TestParquetImport.java | 42 +--- .../org/apache/sqoop/hive/TestHiveImport.java | 159 -------------- .../apache/sqoop/tool/TestBaseSqoopTool.java | 5 +- 20 files changed, 20 insertions(+), 967 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/ivy.xml ---------------------------------------------------------------------- diff --git a/ivy.xml b/ivy.xml index 1f587f3..796ef70 100644 --- a/ivy.xml +++ b/ivy.xml @@ -114,15 +114,7 @@ under the License. conf="common->default;redist->default"/> <dependency org="org.apache.commons" name="commons-lang3" rev="${commons-lang3.version}" conf="common->default;redist->default"/> - <dependency org="org.kitesdk" name="kite-data-mapreduce" rev="${kite-data.version}" - conf="common->default;redist->default"> - <exclude org="org.apache.avro" module="avro" /> - </dependency> - <dependency org="org.kitesdk" name="kite-data-hive" rev="${kite-data.version}" - conf="common->default;redist->default"> - <exclude org="com.twitter" module="parquet-hive-bundle"/> - <exclude org="org.apache.avro" module="avro" /> - </dependency> + <dependency org="com.twitter" name="parquet-avro" rev="${parquet.version}" conf="common->default;redist->default"/> <dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="${jackson-databind.version}" conf="common->default;redist->default" /> http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/ivy/libraries.properties ---------------------------------------------------------------------- diff --git a/ivy/libraries.properties b/ivy/libraries.properties index 565a8bf..c506ca8 100644 --- a/ivy/libraries.properties +++ b/ivy/libraries.properties @@ -20,8 +20,6 @@ avro.version=1.8.1 -kite-data.version=1.1.0 - checkstyle.version=5.0 commons-cli.version=1.2 @@ -62,3 +60,4 @@ hbase.version=1.2.4 hcatalog.version=1.2.1 jackson-databind.version=2.9.5 +parquet.version=1.6.0 http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/docs/user/hive-notes.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/hive-notes.txt b/src/docs/user/hive-notes.txt index af97d94..d58c4d6 100644 --- a/src/docs/user/hive-notes.txt +++ b/src/docs/user/hive-notes.txt @@ -28,11 +28,3 @@ direct mapping (for example, +DATE+, +TIME+, and +TIMESTAMP+) will be coerced to +STRING+ in Hive. The +NUMERIC+ and +DECIMAL+ SQL types will be coerced to +DOUBLE+. In these cases, Sqoop will emit a warning in its log messages informing you of the loss of precision. - -Parquet Support in Hive -~~~~~~~~~~~~~~~~~~~~~~~ - -When using the Kite Dataset API based Parquet implementation in order to contact the Hive MetaStore -from a MapReduce job, a delegation token will be fetched and passed. HIVE_CONF_DIR and HIVE_HOME must be set appropriately to add -Hive to the runtime classpath. Otherwise, importing/exporting into Hive in Parquet -format may not work. http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/docs/user/import.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index a2c16d9..79f7101 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -60,7 +60,7 @@ Argument Description +\--as-sequencefile+ Imports data to SequenceFiles +\--as-textfile+ Imports data as plain text (default) +\--as-parquetfile+ Imports data to Parquet Files -+\--parquet-configurator-implementation+ Sets the implementation used during Parquet import. Supported values: kite, hadoop. ++\--parquet-configurator-implementation+ Sets the implementation used during Parquet import. Supported value: hadoop. +\--boundary-query <statement>+ Boundary query to use for creating splits +\--columns <col,col,col...>+ Columns to import from table +\--delete-target-dir+ Delete the import target directory\ @@ -448,35 +448,14 @@ and Avro files. Parquet support +++++++++++++++ -Sqoop has two different implementations for importing data in Parquet format: +Sqoop has only one implementation now for importing data in Parquet format which is based on the Parquet Hadoop API. +Note that the legacy Kite Dataset API based implementation is removed so users have to make sure that both ++\--parquet-configurator-implementation+ option and +parquetjob.configurator.implementation+ property are unset or +set to "hadoop". -- Kite Dataset API based implementation (default, legacy) -- Parquet Hadoop API based implementation (recommended) - -The users can specify the desired implementation with the +\--parquet-configurator-implementation+ option: - ----- -$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation kite ----- - ----- -$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-parquetfile --parquet-configurator-implementation hadoop ----- - -If the +\--parquet-configurator-implementation+ option is not present then Sqoop will check the value of +parquetjob.configurator.implementation+ -property (which can be specified using -D in the Sqoop command or in the site.xml). If that value is also absent Sqoop will -default to Kite Dataset API based implementation. - -The Kite Dataset API based implementation executes the import command on a different code -path than the text import: it creates the Hive table based on the generated Avro schema by connecting to the Hive metastore. -This can be a disadvantage since sometimes moving from the text file format to the Parquet file format can lead to many -unexpected behavioral changes. Kite checks the Hive table schema before importing the data into it so if the user wants -to import some data which has a schema incompatible with the Hive table's schema Sqoop will throw an error. This implementation -uses snappy codec for compression by default and apart from this it supports the bzip codec too. - -The Parquet Hadoop API based implementation builds the Hive CREATE TABLE statement and executes the -LOAD DATA INPATH command just like the text import does. Unlike Kite it also supports connecting to HiveServer2 (using the +\--hs2-url+ option) -so it provides better security features. This implementation does not check the Hive table's schema before importing so +The default Parquet import implementation builds the Hive CREATE TABLE statement and executes the +LOAD DATA INPATH command just like the text import does. It supports connecting to HiveServer2 (using the +\--hs2-url+ option) +but it does not check the Hive table's schema before importing so it is possible that the user can successfully import data into Hive but they get an error during a Hive read operation later. It does not use any compression by default but supports snappy and bzip codecs. @@ -487,6 +466,8 @@ $ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES --as-par --parquet-configurator-implementation hadoop --hs2-url "jdbc:hive2://hs2.foo.com:10000" --hs2-keytab "/path/to/keytab" ---- +Note that +\--parquet-configurator-implementation hadoop+ is now optional. + Enabling Logical Types in Avro and Parquet import for numbers ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index cc1b752..f97dbfd 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -53,7 +53,7 @@ import org.apache.sqoop.util.RandomHash; import org.apache.sqoop.util.StoredAsProperty; import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY; -import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP; import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier; /** @@ -1161,7 +1161,7 @@ public class SqoopOptions implements Cloneable { // set escape column mapping to true this.escapeColumnMappingEnabled = true; - this.parquetConfiguratorImplementation = KITE; + this.parquetConfiguratorImplementation = HADOOP; } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java index 050c854..c6b576d 100644 --- a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java +++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorImplementation.java @@ -19,14 +19,13 @@ package org.apache.sqoop.mapreduce.parquet; import org.apache.sqoop.mapreduce.parquet.hadoop.HadoopParquetJobConfiguratorFactory; -import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory; /** * An enum containing all the implementations available for {@link ParquetJobConfiguratorFactory}. * The enumeration constants are also used to instantiate concrete {@link ParquetJobConfiguratorFactory} objects. */ public enum ParquetJobConfiguratorImplementation { - KITE(KiteParquetJobConfiguratorFactory.class), HADOOP(HadoopParquetJobConfiguratorFactory.class); + HADOOP(HadoopParquetJobConfiguratorFactory.class); private Class<? extends ParquetJobConfiguratorFactory> configuratorFactoryClass; http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java deleted file mode 100644 index 02816d7..0000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.io.NullWritable; -import org.apache.sqoop.mapreduce.MergeParquetReducer; - -import java.io.IOException; - -/** - * An implementation of {@link MergeParquetReducer} which depends on the Kite Dataset API. - */ -public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> { - - @Override - protected void write(Context context, GenericRecord record) throws IOException, InterruptedException { - context.write(record, null); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java deleted file mode 100644 index 6ebc5a3..0000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; -import org.apache.sqoop.util.FileSystemUtil; -import org.kitesdk.data.mapreduce.DatasetKeyInputFormat; - -import java.io.IOException; - -/** - * An implementation of {@link ParquetExportJobConfigurator} which depends on the Kite Dataset API. - */ -public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator { - - @Override - public void configureInputFormat(Job job, Path inputPath) throws IOException { - String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration()); - DatasetKeyInputFormat.configure(job).readFrom(uri); - } - - @Override - public Class<? extends Mapper> getMapperClass() { - return KiteParquetExportMapper.class; - } - - @Override - public Class<? extends InputFormat> getInputFormatClass() { - return DatasetKeyInputFormat.class; - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java deleted file mode 100644 index 122ff3f..0000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.io.NullWritable; -import org.apache.sqoop.mapreduce.GenericRecordExportMapper; - -import java.io.IOException; - -/** - * An implementation of {@link GenericRecordExportMapper} which depends on the Kite Dataset API. - */ -public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> { - - @Override - protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException { - context.write(toSqoopRecord(key), NullWritable.get()); - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java deleted file mode 100644 index 7e179a2..0000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.Schema; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.sqoop.SqoopOptions; -import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; -import org.apache.sqoop.util.FileSystemUtil; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; - -import java.io.IOException; - -/** - * An implementation of {@link ParquetImportJobConfigurator} which depends on the Kite Dataset API. - */ -public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator { - - public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName()); - - @Override - public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException { - JobConf conf = (JobConf) job.getConfiguration(); - String uri = getKiteUri(conf, options, tableName, destination); - KiteParquetUtils.WriteMode writeMode; - - if (options.doHiveImport()) { - if (options.doOverwriteHiveTable()) { - writeMode = KiteParquetUtils.WriteMode.OVERWRITE; - } else { - writeMode = KiteParquetUtils.WriteMode.APPEND; - if (Datasets.exists(uri)) { - LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " + - "append data into the existing Hive table. Consider using " + - "--hive-overwrite, if you do NOT intend to do appending."); - } - } - } else { - // Note that there is no such an import argument for overwriting HDFS - // dataset, so overwrite mode is not supported yet. - // Sqoop's append mode means to merge two independent datasets. We - // choose DEFAULT as write mode. - writeMode = KiteParquetUtils.WriteMode.DEFAULT; - } - KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode); - } - - @Override - public Class<? extends Mapper> getMapperClass() { - return KiteParquetImportMapper.class; - } - - @Override - public Class<? extends OutputFormat> getOutputFormatClass() { - return DatasetKeyOutputFormat.class; - } - - @Override - public boolean isHiveImportNeeded() { - return false; - } - - private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException { - if (options.doHiveImport()) { - String hiveDatabase = options.getHiveDatabaseName() == null ? "default" : - options.getHiveDatabaseName(); - String hiveTable = options.getHiveTableName() == null ? tableName : - options.getHiveTableName(); - return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable); - } else { - return "dataset:" + FileSystemUtil.makeQualified(destination, conf); - } - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java deleted file mode 100644 index 0a91e4a..0000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.sqoop.avro.AvroUtil; -import org.apache.sqoop.lib.LargeObjectLoader; -import org.apache.sqoop.mapreduce.ParquetImportMapper; - -import java.io.IOException; - -import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; - -/** - * An implementation of {@link ParquetImportMapper} which depends on the Kite Dataset API. - */ -public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> { - - @Override - protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID())); - return new LargeObjectLoader(conf, workPath); - } - - @Override - protected Schema getAvroSchema(Configuration configuration) { - String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY); - return AvroUtil.parseAvroSchema(schemaString); - } - - @Override - protected void write(Context context, GenericRecord record) throws IOException, InterruptedException { - context.write(record, null); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java deleted file mode 100644 index bd07c09..0000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator; -import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator; -import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory; -import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; - -/** - * A concrete factory implementation which produces configurator objects using the Kite Dataset API. - */ -public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory { - - @Override - public ParquetImportJobConfigurator createParquetImportJobConfigurator() { - return new KiteParquetImportJobConfigurator(); - } - - @Override - public ParquetExportJobConfigurator createParquetExportJobConfigurator() { - return new KiteParquetExportJobConfigurator(); - } - - @Override - public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() { - return new KiteParquetMergeJobConfigurator(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java deleted file mode 100644 index ed045cd..0000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.sqoop.mapreduce.MergeParquetMapper; -import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetDescriptor; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.Formats; -import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; -import parquet.avro.AvroParquetInputFormat; -import parquet.avro.AvroSchemaConverter; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileReader; -import parquet.schema.MessageType; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; - -/** - * An implementation of {@link ParquetMergeJobConfigurator} which depends on the Kite Dataset API. - */ -public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator { - - public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName()); - - @Override - public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, - Path finalPath) throws IOException { - try { - FileSystem fileSystem = finalPath.getFileSystem(conf); - LOG.info("Trying to merge parquet files"); - job.setOutputKeyClass(GenericRecord.class); - job.setMapperClass(MergeParquetMapper.class); - job.setReducerClass(KiteMergeParquetReducer.class); - job.setOutputValueClass(NullWritable.class); - - List<Footer> footers = new ArrayList<Footer>(); - FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath); - FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath); - footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true)); - footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true)); - - MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema(); - AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(); - Schema avroSchema = avroSchemaConverter.convert(schema); - - if (!fileSystem.exists(finalPath)) { - Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath); - DatasetKeyOutputFormat.configure(job).overwrite(dataset); - } else { - DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath)); - } - - job.setInputFormatClass(AvroParquetInputFormat.class); - AvroParquetInputFormat.setAvroReadSchema(job, avroSchema); - - conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString()); - Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class; - - job.setOutputFormatClass(outClass); - } catch (Exception cnfe) { - throw new IOException(cnfe); - } - } - - public static Dataset createDataset(Schema schema, String uri) { - DatasetDescriptor descriptor = - new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build(); - return Datasets.create(uri, descriptor, GenericRecord.class); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java deleted file mode 100644 index a4768c9..0000000 --- a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.mapreduce.parquet.kite; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.sqoop.avro.AvroSchemaMismatchException; -import org.apache.sqoop.hive.HiveConfig; -import org.kitesdk.data.CompressionType; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetDescriptor; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.Formats; -import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; -import org.kitesdk.data.spi.SchemaValidationUtil; - -import java.io.IOException; -import java.lang.reflect.Method; - -import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; -import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY; - -/** - * Helper class using the Kite Dataset API for setting up a Parquet MapReduce job. - */ -public final class KiteParquetUtils { - - public static final Log LOG = LogFactory.getLog(KiteParquetUtils.class.getName()); - - public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; - - public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled"; - // Purposefully choosing the same token alias as the one Oozie chooses. - // Make sure we don't generate a new delegation token if oozie - // has already generated one. - public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token"; - - public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. "; - - public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " + - "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" + - " but it is possible that date/timestamp types were mapped to strings during table" + - " creation. Consider using Sqoop option --map-column-java resolve the mismatch" + - " (e.g. --map-column-java date_field1=String,timestamp_field1=String)."; - - private static final String HIVE_URI_PREFIX = "dataset:hive"; - - private KiteParquetUtils() { - } - - public enum WriteMode { - DEFAULT, APPEND, OVERWRITE - }; - - public static CompressionType getCompressionType(Configuration conf) { - CompressionType defaults = Formats.PARQUET.getDefaultCompressionType(); - String codec = conf.get(SQOOP_PARQUET_OUTPUT_CODEC_KEY, defaults.getName()); - try { - return CompressionType.forName(codec); - } catch (IllegalArgumentException ex) { - LOG.warn(String.format( - "Unsupported compression type '%s'. Fallback to '%s'.", - codec, defaults)); - } - return defaults; - } - - /** - * Configure the import job. The import process will use a Kite dataset to - * write data records into Parquet format internally. The input key class is - * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is - * {@link org.apache.avro.generic.GenericRecord}. - */ - public static void configureImportJob(JobConf conf, Schema schema, - String uri, WriteMode writeMode) throws IOException { - Dataset dataset; - - // Add hive delegation token only if we don't already have one. - if (isHiveImport(uri)) { - Configuration hiveConf = HiveConfig.getHiveConf(conf); - if (isSecureMetastore(hiveConf)) { - // Copy hive configs to job config - HiveConfig.addHiveConfigs(hiveConf, conf); - - if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) { - addHiveDelegationToken(conf); - } - } - } - - if (Datasets.exists(uri)) { - if (WriteMode.DEFAULT.equals(writeMode)) { - throw new IOException("Destination exists! " + uri); - } - - dataset = Datasets.load(uri); - Schema writtenWith = dataset.getDescriptor().getSchema(); - if (!SchemaValidationUtil.canRead(writtenWith, schema)) { - String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri)); - throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema); - } - } else { - dataset = createDataset(schema, getCompressionType(conf), uri); - } - conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString()); - - DatasetKeyOutputFormat.ConfigBuilder builder = - DatasetKeyOutputFormat.configure(conf); - if (WriteMode.OVERWRITE.equals(writeMode)) { - builder.overwrite(dataset); - } else if (WriteMode.APPEND.equals(writeMode)) { - builder.appendTo(dataset); - } else { - builder.writeTo(dataset); - } - } - - private static boolean isHiveImport(String importUri) { - return importUri.startsWith(HIVE_URI_PREFIX); - } - - public static Dataset createDataset(Schema schema, - CompressionType compressionType, String uri) { - DatasetDescriptor descriptor = new DatasetDescriptor.Builder() - .schema(schema) - .format(Formats.PARQUET) - .compressionType(compressionType) - .build(); - return Datasets.create(uri, descriptor, GenericRecord.class); - } - - private static boolean isSecureMetastore(Configuration conf) { - return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false); - } - - /** - * Add hive delegation token to credentials store. - * @param conf - */ - private static void addHiveDelegationToken(JobConf conf) { - // Need to use reflection since there's no compile time dependency on the client libs. - Class<?> HiveConfClass; - Class<?> HiveMetaStoreClientClass; - - try { - HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS); - } catch (ClassNotFoundException ex) { - LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS - + " when adding hive delegation token. " - + "Make sure HIVE_CONF_DIR is set correctly.", ex); - throw new RuntimeException("Couldn't fetch delegation token.", ex); - } - - try { - HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS); - } catch (ClassNotFoundException ex) { - LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS - + " when adding hive delegation token." - + " Make sure HIVE_CONF_DIR is set correctly.", ex); - throw new RuntimeException("Couldn't fetch delegation token.", ex); - } - - try { - Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance( - HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class) - ); - // getDelegationToken(String kerberosPrincial) - Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class); - Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName()); - - // Load token - Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>(); - metastoreToken.decodeFromUrlString(tokenStringForm.toString()); - conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken); - - LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken); - } catch (Exception ex) { - LOG.error("Couldn't fetch delegation token.", ex); - throw new RuntimeException("Couldn't fetch delegation token.", ex); - } - } - - private static String buildAvroSchemaMismatchMessage(boolean hiveImport) { - String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG; - - if (hiveImport) { - exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG; - } - - return exceptionMessage; - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/java/org/apache/sqoop/tool/BaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index 87fc5e9..9dcbdd5 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -21,7 +21,6 @@ package org.apache.sqoop.tool; import static java.lang.String.format; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.PARQUET_JOB_CONFIGURATOR_IMPLEMENTATION_KEY; -import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.valueOf; import java.io.File; @@ -1587,15 +1586,6 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool { + "importing into SequenceFile format."); } - // Hive import and create hive table not compatible for ParquetFile format when using Kite - if (options.doHiveImport() - && options.doFailIfHiveTableExists() - && options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile - && options.getParquetConfiguratorImplementation() == KITE) { - throw new InvalidOptionsException("Hive import and create hive table is not compatible with " - + "importing into ParquetFile format using Kite."); - } - if (options.doHiveImport() && options.getIncrementalMode().equals(IncrementalMode.DateLastModified)) { throw new InvalidOptionsException(HIVE_IMPORT_WITH_LASTMODIFIED_NOT_SUPPORTED); http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/TestMerge.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestMerge.java b/src/test/org/apache/sqoop/TestMerge.java index 2b3280a..b283174 100644 --- a/src/test/org/apache/sqoop/TestMerge.java +++ b/src/test/org/apache/sqoop/TestMerge.java @@ -27,7 +27,6 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.List; -import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation; import org.apache.sqoop.testutil.CommonArgs; import org.apache.sqoop.testutil.HsqldbTestServer; import org.apache.sqoop.manager.ConnManager; @@ -54,8 +53,6 @@ import org.apache.sqoop.util.ParquetReader; import org.junit.Before; import org.junit.Test; -import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP; -import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; import static org.junit.Assert.fail; /** @@ -84,8 +81,6 @@ public class TestMerge extends BaseSqoopTestCase { Arrays.asList(new Integer(1), new Integer(43)), Arrays.asList(new Integer(3), new Integer(313))); - private ParquetJobConfiguratorImplementation parquetJobConfiguratorImplementation = KITE; - @Before public void setUp() { super.setUp(); @@ -118,7 +113,6 @@ public class TestMerge extends BaseSqoopTestCase { public SqoopOptions getSqoopOptions(Configuration conf) { SqoopOptions options = new SqoopOptions(conf); options.setConnectString(HsqldbTestServer.getDbUrl()); - options.setParquetConfiguratorImplementation(parquetJobConfiguratorImplementation); return options; } @@ -164,14 +158,7 @@ public class TestMerge extends BaseSqoopTestCase { } @Test - public void testParquetFileMergeHadoop() throws Exception { - parquetJobConfiguratorImplementation = HADOOP; - runMergeTest(SqoopOptions.FileLayout.ParquetFile); - } - - @Test - public void testParquetFileMergeKite() throws Exception { - parquetJobConfiguratorImplementation = KITE; + public void testParquetFileMerge() throws Exception { runMergeTest(SqoopOptions.FileLayout.ParquetFile); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/TestParquetExport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestParquetExport.java b/src/test/org/apache/sqoop/TestParquetExport.java index 0fab188..be1d816 100644 --- a/src/test/org/apache/sqoop/TestParquetExport.java +++ b/src/test/org/apache/sqoop/TestParquetExport.java @@ -18,9 +18,6 @@ package org.apache.sqoop; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.sqoop.testutil.ExportJobTestCase; import com.google.common.collect.Lists; @@ -32,8 +29,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import parquet.avro.AvroParquetWriter; import java.io.IOException; @@ -44,7 +39,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -58,23 +52,11 @@ import static parquet.hadoop.metadata.CompressionCodecName.SNAPPY; /** * Test that we can export Parquet Data Files from HDFS into databases. */ -@RunWith(Parameterized.class) public class TestParquetExport extends ExportJobTestCase { - @Parameterized.Parameters(name = "parquetImplementation = {0}") - public static Iterable<? extends Object> parquetImplementationParameters() { - return Arrays.asList("kite", "hadoop"); - } - @Rule public ExpectedException thrown = ExpectedException.none(); - private final String parquetImplementation; - - public TestParquetExport(String parquetImplementation) { - this.parquetImplementation = parquetImplementation; - } - /** * @return an argv for the CodeGenTool to use when creating tables to export. */ @@ -144,8 +126,6 @@ public class TestParquetExport extends ExportJobTestCase { /** * Create a data file that gets exported to the db. - * Sqoop uses Kite to export Parquet files so it requires a Kite metadata directory to be present next to the files - * but since we do not use Kite in our test cases anymore we generate the .metadata directory here. * @param numRecords how many records to write to the file. */ protected void createParquetFile(int numRecords, @@ -153,7 +133,6 @@ public class TestParquetExport extends ExportJobTestCase { Schema schema = buildSchema(extraCols); - createMetadataDir(schema); String fileName = UUID.randomUUID().toString() + ".parquet"; Path filePath = new Path(getTablePath(), fileName); try (AvroParquetWriter parquetWriter = new AvroParquetWriter(filePath, schema, SNAPPY, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE)) { @@ -167,25 +146,6 @@ public class TestParquetExport extends ExportJobTestCase { } } - private void createMetadataDir(Schema schema) throws IOException { - final String descriptorFileTemplate = "location=file\\:%s\n" + - " version=1\n" + - " compressionType=snappy\n" + - " format=parquet\n"; - Path metadataDirPath = new Path(getTablePath(), ".metadata"); - Path schemaFile = new Path(metadataDirPath, "schema.avsc"); - Path descriptorFile = new Path(metadataDirPath, "descriptor.properties"); - FileSystem fileSystem = getTablePath().getFileSystem(new Configuration()); - fileSystem.mkdirs(metadataDirPath); - - try (FSDataOutputStream fileOs = fileSystem.create(schemaFile)) { - fileOs.write(schema.toString().getBytes()); - } - try (FSDataOutputStream fileOs = fileSystem.create(descriptorFile)) { - fileOs.write(String.format(descriptorFileTemplate, getTablePath()).getBytes()); - } - } - private Schema buildSchema(ColumnGenerator... extraCols) { List<Field> fields = new ArrayList<Field>(); fields.add(buildField("id", Schema.Type.INT)); @@ -492,11 +452,4 @@ public class TestParquetExport extends ExportJobTestCase { thrown.reportMissingExceptionWithMessage("Expected Exception on missing Parquet fields"); runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); } - - @Override - protected Configuration getConf() { - Configuration conf = super.getConf(); - conf.set("parquetjob.configurator.implementation", parquetImplementation); - return conf; - } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/TestParquetImport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestParquetImport.java b/src/test/org/apache/sqoop/TestParquetImport.java index b1488e8..2810e31 100644 --- a/src/test/org/apache/sqoop/TestParquetImport.java +++ b/src/test/org/apache/sqoop/TestParquetImport.java @@ -18,7 +18,6 @@ package org.apache.sqoop; -import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.testutil.CommonArgs; import org.apache.sqoop.testutil.HsqldbTestServer; import org.apache.sqoop.testutil.ImportJobTestCase; @@ -31,9 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sqoop.util.ParquetReader; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import java.io.IOException; import java.nio.ByteBuffer; @@ -48,32 +44,15 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assume.assumeTrue; /** * Tests --as-parquetfile. */ -@RunWith(Parameterized.class) public class TestParquetImport extends ImportJobTestCase { public static final Log LOG = LogFactory .getLog(TestParquetImport.class.getName()); - private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE = "kite"; - - private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP = "hadoop"; - - @Parameters(name = "parquetImplementation = {0}") - public static Iterable<? extends Object> parquetImplementationParameters() { - return Arrays.asList(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE, PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP); - } - - private final String parquetImplementation; - - public TestParquetImport(String parquetImplementation) { - this.parquetImplementation = parquetImplementation; - } - /** * Create the argv to pass to Sqoop. * @@ -136,27 +115,17 @@ public class TestParquetImport extends ImportJobTestCase { } @Test - public void testHadoopGzipCompression() throws IOException { - assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation)); + public void testGzipCompression() throws IOException { runParquetImportTest("gzip"); } - @Test - public void testKiteDeflateCompression() throws IOException { - assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE.equals(parquetImplementation)); - // The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified. - // See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName() - runParquetImportTest("deflate", "gzip"); - } - /** * This test case is added to document that the deflate codec is not supported with * the Hadoop Parquet implementation so Sqoop throws an exception when it is specified. * @throws IOException */ @Test(expected = IOException.class) - public void testHadoopDeflateCompression() throws IOException { - assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation)); + public void testDeflateCompression() throws IOException { runParquetImportTest("deflate"); } @@ -334,11 +303,4 @@ public class TestParquetImport extends ImportJobTestCase { assertEquals(Type.NULL, field.schema().getTypes().get(0).getType()); assertEquals(type, field.schema().getTypes().get(1).getType()); } - - @Override - protected Configuration getConf() { - Configuration conf = super.getConf(); - conf.set("parquetjob.configurator.implementation", parquetImplementation); - return conf; - } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/hive/TestHiveImport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/hive/TestHiveImport.java b/src/test/org/apache/sqoop/hive/TestHiveImport.java index 436f0e5..a6c8e10 100644 --- a/src/test/org/apache/sqoop/hive/TestHiveImport.java +++ b/src/test/org/apache/sqoop/hive/TestHiveImport.java @@ -23,20 +23,12 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; -import java.util.List; -import org.apache.sqoop.Sqoop; - -import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.sqoop.avro.AvroSchemaMismatchException; -import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetUtils; -import org.apache.sqoop.util.ParquetReader; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -54,7 +46,6 @@ import org.apache.sqoop.tool.SqoopTool; import org.apache.commons.cli.ParseException; import org.junit.rules.ExpectedException; -import static java.util.Collections.sort; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -284,54 +275,6 @@ public class TestHiveImport extends ImportJobTestCase { getArgv(false, null), new ImportTool()); } - /** Test that strings and ints are handled in the normal fashion as parquet - * file. */ - @Test - public void testNormalHiveImportAsParquet() throws IOException { - final String TABLE_NAME = "normal_hive_import_as_parquet"; - setCurTableName(TABLE_NAME); - setNumCols(3); - String [] types = getTypes(); - String [] vals = { "'test'", "42", "'somestring'" }; - String [] extraArgs = {"--as-parquetfile"}; - - runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), - new ImportTool()); - verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}}); - } - - private void verifyHiveDataset(Object[][] valsArray) { - List<String> expected = getExpectedLines(valsArray); - List<String> result = new ParquetReader(getTablePath()).readAllInCsv(); - - sort(expected); - sort(result); - - assertEquals(expected, result); - } - - private List<String> getExpectedLines(Object[][] valsArray) { - List<String> expectations = new ArrayList<>(); - if (valsArray != null) { - for (Object[] vals : valsArray) { - expectations.add(toCsv(vals)); - } - } - return expectations; - } - - private String toCsv(Object[] vals) { - StringBuilder result = new StringBuilder(); - - for (Object val : vals) { - result.append(val).append(","); - } - - result.deleteCharAt(result.length() - 1); - - return result.toString(); - } - /** Test that table is created in hive with no data import. */ @Test public void testCreateOnlyHiveImport() throws IOException { @@ -365,108 +308,6 @@ public class TestHiveImport extends ImportJobTestCase { new CreateHiveTableTool()); } - /** - * Test that table is created in hive and replaces the existing table if - * any. - */ - @Test - public void testCreateOverwriteHiveImportAsParquet() throws IOException { - final String TABLE_NAME = "create_overwrite_hive_import_as_parquet"; - setCurTableName(TABLE_NAME); - setNumCols(3); - String [] types = getTypes(); - String [] vals = { "'test'", "42", "'somestring'" }; - String [] extraArgs = {"--as-parquetfile"}; - ImportTool tool = new ImportTool(); - - runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool); - verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}}); - - String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" }; - String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"}; - runImportTest(TABLE_NAME, types, valsToOverwrite, "", - getArgv(false, extraArgsForOverwrite), tool); - verifyHiveDataset(new Object[][] {{"test2", 24, "somestring2"}}); - } - - @Test - public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception { - final String TABLE_NAME = "HIVE_IMPORT_AS_PARQUET_EXISTING_TABLE"; - setCurTableName(TABLE_NAME); - setNumCols(3); - - String [] types = { "VARCHAR(32)", "INTEGER", "DATE" }; - String [] vals = { "'test'", "42", "'2009-12-31'" }; - String [] extraArgs = {"--as-parquetfile"}; - - createHiveDataSet(TABLE_NAME); - - createTableWithColTypes(types, vals); - - thrown.expect(AvroSchemaMismatchException.class); - thrown.expectMessage(KiteParquetUtils.INCOMPATIBLE_AVRO_SCHEMA_MSG + KiteParquetUtils.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG); - - SqoopOptions sqoopOptions = getSqoopOptions(getConf()); - sqoopOptions.setThrowOnError(true); - Sqoop sqoop = new Sqoop(new ImportTool(), getConf(), sqoopOptions); - sqoop.run(getArgv(false, extraArgs)); - - } - - private void createHiveDataSet(String tableName) { - Schema dataSetSchema = SchemaBuilder - .record(tableName) - .fields() - .name(getColName(0)).type().nullable().stringType().noDefault() - .name(getColName(1)).type().nullable().stringType().noDefault() - .name(getColName(2)).type().nullable().stringType().noDefault() - .endRecord(); - String dataSetUri = "dataset:hive:/default/" + tableName; - KiteParquetUtils.createDataset(dataSetSchema, KiteParquetUtils.getCompressionType(new Configuration()), dataSetUri); - } - - /** - * Test that records are appended to an existing table. - */ - @Test - public void testAppendHiveImportAsParquet() throws IOException { - final String TABLE_NAME = "append_hive_import_as_parquet"; - setCurTableName(TABLE_NAME); - setNumCols(3); - String [] types = getTypes(); - String [] vals = { "'test'", "42", "'somestring'" }; - String [] extraArgs = {"--as-parquetfile"}; - String [] args = getArgv(false, extraArgs); - ImportTool tool = new ImportTool(); - - runImportTest(TABLE_NAME, types, vals, "", args, tool); - verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}}); - - String [] valsToAppend = { "'test2'", "4242", "'somestring2'" }; - runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool); - verifyHiveDataset(new Object[][] { - {"test2", 4242, "somestring2"}, {"test", 42, "somestring"}}); - } - - /** - * Test hive create and --as-parquetfile options validation. - */ - @Test - public void testCreateHiveImportAsParquet() throws ParseException, InvalidOptionsException { - final String TABLE_NAME = "CREATE_HIVE_IMPORT_AS_PARQUET"; - setCurTableName(TABLE_NAME); - setNumCols(3); - String [] extraArgs = {"--as-parquetfile", "--create-hive-table"}; - ImportTool tool = new ImportTool(); - - thrown.expect(InvalidOptionsException.class); - thrown.reportMissingExceptionWithMessage("Expected InvalidOptionsException during Hive table creation with " + - "--as-parquetfile"); - tool.validateOptions(tool.parseArguments(getArgv(false, extraArgs), null, - null, true)); - } - - /** Test that dates are coerced properly to strings. */ @Test public void testDate() throws IOException { http://git-wip-us.apache.org/repos/asf/sqoop/blob/739bbce4/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java b/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java index dbda8b7..5571b25 100644 --- a/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java +++ b/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java @@ -28,7 +28,6 @@ import org.junit.rules.ExpectedException; import org.mockito.Mockito; import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP; -import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; import static org.hamcrest.CoreMatchers.sameInstance; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; @@ -124,7 +123,7 @@ public class TestBaseSqoopTool { public void testApplyCommonOptionsThrowsWhenInvalidParquetJobConfigurationImplementationIsSet() throws Exception { when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn("this_is_definitely_not_valid"); - exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [KITE, HADOOP]"); + exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [HADOOP]"); testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions); } @@ -132,6 +131,6 @@ public class TestBaseSqoopTool { public void testApplyCommonOptionsDoesNotChangeDefaultParquetJobConfigurationImplementationWhenNothingIsSet() throws Exception { testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions); - assertEquals(KITE, testSqoopOptions.getParquetConfiguratorImplementation()); + assertEquals(HADOOP, testSqoopOptions.getParquetConfiguratorImplementation()); } }
