Repository: sqoop Updated Branches: refs/heads/sqoop2 2f4da466e -> 55d1db2ba
SQOOP-2788: Sqoop2: Parquet support for HdfsConnector (Abraham Fine via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/55d1db2b Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/55d1db2b Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/55d1db2b Branch: refs/heads/sqoop2 Commit: 55d1db2ba3cdd45e60daeb18ef5529f2be282f1f Parents: 2f4da46 Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri Feb 5 12:51:08 2016 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri Feb 5 12:51:08 2016 -0800 ---------------------------------------------------------------------- ...pache.sqoop.connector-classloader.properties | 2 + connector/connector-hdfs/pom.xml | 10 + .../sqoop/connector/hdfs/HdfsExtractor.java | 87 ++++++++- .../apache/sqoop/connector/hdfs/HdfsLoader.java | 28 ++- .../connector/hdfs/configuration/ToFormat.java | 5 + .../hdfs/hdfsWriter/GenericHdfsWriter.java | 3 +- .../hdfs/hdfsWriter/HdfsParquetWriter.java | 66 +++++++ .../hdfs/hdfsWriter/HdfsSequenceWriter.java | 5 +- .../hdfs/hdfsWriter/HdfsTextWriter.java | 3 +- .../apache/sqoop/connector/hdfs/TestLoader.java | 109 +++++++++-- .../sqoop/connector/common/SqoopAvroUtils.java | 3 +- .../idf/AVROIntermediateDataFormat.java | 14 +- pom.xml | 11 ++ test/pom.xml | 10 + .../connector/hdfs/NullValueTest.java | 86 +++++++-- .../integration/connector/hdfs/ParquetTest.java | 183 +++++++++++++++++++ 16 files changed, 577 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/common/src/main/resources/org.apache.sqoop.connector-classloader.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/org.apache.sqoop.connector-classloader.properties b/common/src/main/resources/org.apache.sqoop.connector-classloader.properties index c0082cc..0311f88 100644 --- a/common/src/main/resources/org.apache.sqoop.connector-classloader.properties +++ b/common/src/main/resources/org.apache.sqoop.connector-classloader.properties @@ -52,6 +52,8 @@ system.classes.default=java.,\ org.apache.log4j.,\ org.apache.sqoop.,\ -org.apache.sqoop.connector.,\ + org.apache.avro.,\ + org.codehaus.jackson.,\ org.xerial.snappy.,\ sqoop.properties,\ sqoop_bootstrap.properties http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml index 5996314..37cf3fa 100644 --- a/connector/connector-hdfs/pom.xml +++ b/connector/connector-hdfs/pom.xml @@ -73,6 +73,16 @@ limitations under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 9ef2a05..5973463 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -19,10 +19,14 @@ package org.apache.sqoop.connector.hdfs; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; @@ -33,13 +37,18 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.util.LineReader; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat; import org.apache.sqoop.error.code.HdfsConnectorError; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; @@ -55,6 +64,10 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura public static final Logger LOG = Logger.getLogger(HdfsExtractor.class); + // the sequence of bytes that appears at the beginning and end of every + // parquet file + private static final byte[] PARQUET_MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); + private Configuration conf = new Configuration(); private DataWriter dataWriter; private Schema schema; @@ -85,7 +98,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura private void extractFile(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Path file, long start, long length, String[] locations) - throws IOException { + throws IOException, InterruptedException { long end = start + length; LOG.info("Extracting file " + file); LOG.info("\t from offset " + start); @@ -93,8 +106,10 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura LOG.info("\t of length " + length); if(isSequenceFile(file)) { extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length, locations); - } else { - extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length, locations); + } else if(isParquetFile(file)) { + extractParquetFile(linkConfiguration, fromJobConfiguration, file, start, length, locations); + } else { + extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length); } } @@ -136,7 +151,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura @SuppressWarnings("resource") private void extractTextFile(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, - Path file, long start, long length, String[] locations) + Path file, long start, long length) throws IOException { LOG.info("Extracting text file"); long end = start + length; @@ -185,6 +200,35 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura filestream.close(); } + private void extractParquetFile(LinkConfiguration linkConfiguration, + FromJobConfiguration fromJobConfiguration, + Path file, long start, long length, + String[] locations) throws IOException, InterruptedException { + // Parquet does not expose a way to directly deal with file splits + // except through the ParquetInputFormat (ParquetInputSplit is @private) + FileSplit fileSplit = new FileSplit(file, start, length, locations); + conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, AvroReadSupport.class.getName()); + ParquetInputFormat parquetInputFormat = new ParquetInputFormat(); + + // ParquetReader needs a TaskAttemptContext to pass through the + // configuration object. + TaskAttemptContext taskAttemptContext = new SqoopTaskAttemptContext(conf); + + RecordReader<Void, GenericRecord> recordReader = parquetInputFormat.createRecordReader(fileSplit, taskAttemptContext); + recordReader.initialize(fileSplit, taskAttemptContext); + + AVROIntermediateDataFormat idf = new AVROIntermediateDataFormat(schema); + while (recordReader.nextKeyValue() != false) { + GenericRecord record = recordReader.getCurrentValue(); + rowsRead++; + if (schema instanceof ByteArraySchema) { + dataWriter.writeArrayRecord(new Object[]{idf.toObject(record)}); + } else { + dataWriter.writeArrayRecord(idf.toObject(record)); + } + } + } + @Override public long getRowsRead() { return rowsRead; @@ -207,6 +251,41 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura return true; } + private boolean isParquetFile(Path file) { + try { + FileSystem fileSystem = file.getFileSystem(conf); + FileStatus fileStatus = fileSystem.getFileStatus(file); + FSDataInputStream fsDataInputStream = fileSystem.open(file); + + long fileLength = fileStatus.getLen(); + + byte[] fileStart = new byte[PARQUET_MAGIC.length]; + fsDataInputStream.readFully(fileStart); + + if (LOG.isDebugEnabled()) { + LOG.error("file start: " + new String(fileStart, Charset.forName("ASCII"))); + } + + if (!Arrays.equals(fileStart, PARQUET_MAGIC)) { + return false; + } + + long fileEndIndex = fileLength - PARQUET_MAGIC.length; + fsDataInputStream.seek(fileEndIndex); + + byte[] fileEnd = new byte[PARQUET_MAGIC.length]; + fsDataInputStream.readFully(fileEnd); + + if (LOG.isDebugEnabled()) { + LOG.error("file end: " + new String(fileEnd, Charset.forName("ASCII"))); + } + + return Arrays.equals(fileEnd, PARQUET_MAGIC); + } catch (IOException e) { + return false; + } + } + private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException { if (schema instanceof ByteArraySchema) { dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)}); http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 5de20c6..7cef93c 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -32,6 +32,7 @@ import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter; import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter; import org.apache.sqoop.error.code.HdfsConnectorError; @@ -89,7 +90,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { GenericHdfsWriter filewriter = getWriter(toJobConfig); - filewriter.initialize(filepath, conf, codec); + filewriter.initialize(filepath, context.getSchema(), conf, codec); if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) { String record; @@ -119,8 +120,14 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { } private GenericHdfsWriter getWriter(ToJobConfiguration toJobConf) { - return (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE) ? new HdfsSequenceWriter() - : new HdfsTextWriter(); + switch(toJobConf.toJobConfig.outputFormat) { + case SEQUENCE_FILE: + return new HdfsSequenceWriter(); + case PARQUET_FILE: + return new HdfsParquetWriter(); + default: + return new HdfsTextWriter(); + } } private String getCompressionCodecName(ToJobConfiguration toJobConf) { @@ -151,11 +158,16 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { //TODO: We should probably support configurable extensions at some point private static String getExtension(ToJobConfiguration toJobConf, CompressionCodec codec) { - if (toJobConf.toJobConfig.outputFormat == ToFormat.SEQUENCE_FILE) - return ".seq"; - if (codec == null) - return ".txt"; - return codec.getDefaultExtension(); + switch(toJobConf.toJobConfig.outputFormat) { + case SEQUENCE_FILE: + return ".seq"; + case PARQUET_FILE: + return ".parquet"; + default: + if (codec == null) + return ".txt"; + return codec.getDefaultExtension(); + } } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java index 27d121f..ffce583 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToFormat.java @@ -30,4 +30,9 @@ public enum ToFormat { * Sequence file */ SEQUENCE_FILE, + + /** + * Parquet file + */ + PARQUET_FILE, } http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java index 2ccccc4..31023e7 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java @@ -20,12 +20,13 @@ package org.apache.sqoop.connector.hdfs.hdfsWriter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.schema.Schema; import java.io.IOException; public abstract class GenericHdfsWriter { - public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException; + public abstract void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException; public abstract void write(String csv) throws IOException; http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java new file mode 100644 index 0000000..4ec813b --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsParquetWriter.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.hdfsWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat; +import org.apache.sqoop.schema.Schema; + +import java.io.IOException; + +public class HdfsParquetWriter extends GenericHdfsWriter { + + private ParquetWriter avroParquetWriter; + private Schema sqoopSchema; + private AVROIntermediateDataFormat avroIntermediateDataFormat; + + @Override + public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec hadoopCodec) throws IOException { + sqoopSchema = schema; + avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema); + + CompressionCodecName parquetCodecName; + if (hadoopCodec == null) { + parquetCodecName = CompressionCodecName.UNCOMPRESSED; + } else { + parquetCodecName = CompressionCodecName.fromCompressionCodec(hadoopCodec.getClass()); + } + + avroParquetWriter = + AvroParquetWriter.builder(filepath) + .withSchema(avroIntermediateDataFormat.getAvroSchema()) + .withCompressionCodec(parquetCodecName) + .withConf(conf).build(); + + } + + @Override + public void write(String csv) throws IOException { + avroParquetWriter.write(avroIntermediateDataFormat.toAVRO(csv)); + } + + @Override + public void destroy() throws IOException { + avroParquetWriter.close(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java index 75c2e7e..dcce861 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java @@ -23,16 +23,17 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.schema.Schema; import java.io.IOException; -public class HdfsSequenceWriter extends GenericHdfsWriter { +public class HdfsSequenceWriter extends GenericHdfsWriter { private SequenceFile.Writer filewriter; private Text text; @SuppressWarnings("deprecation") - public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { + public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException { if (codec != null) { filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), conf, filepath, Text.class, NullWritable.class, http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java index 78cf973..384e330 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.sqoop.connector.hdfs.HdfsConstants; +import org.apache.sqoop.schema.Schema; import java.io.BufferedWriter; import java.io.DataOutputStream; @@ -34,7 +35,7 @@ public class HdfsTextWriter extends GenericHdfsWriter { private BufferedWriter filewriter; @Override - public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { + public void initialize(Path filepath, Schema schema, Configuration conf, CompressionCodec codec) throws IOException { FileSystem fs = filepath.getFileSystem(conf); DataOutputStream filestream = fs.create(filepath, false); http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index adede3a..cbd555a 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -17,9 +17,6 @@ */ package org.apache.sqoop.connector.hdfs; -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -27,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -35,11 +33,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; import org.apache.sqoop.common.MutableMapContext; +import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToCompression; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; @@ -47,13 +51,18 @@ import org.apache.sqoop.schema.Schema; import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.schema.type.FloatingPoint; import org.apache.sqoop.schema.type.Text; -import org.testng.annotations.AfterMethod; +import org.apache.sqoop.utils.ClassUtils; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.PARQUET_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; + public class TestLoader extends TestHdfsBase { private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; private static final int NUMBER_OF_ROWS_PER_FILE = 1000; @@ -63,6 +72,7 @@ public class TestLoader extends TestHdfsBase { private final String outputDirectory; private Loader loader; private String user = "test_user"; + private Schema schema; @Factory(dataProvider="test-hdfs-loader") public TestLoader(ToFormat outputFormat, @@ -80,9 +90,10 @@ public class TestLoader extends TestHdfsBase { for (ToCompression compression : new ToCompression[]{ ToCompression.DEFAULT, ToCompression.BZIP2, + ToCompression.GZIP, ToCompression.NONE }) { - for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) { + for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE, PARQUET_FILE}) { parameters.add(new Object[]{outputFileType, compression}); } } @@ -100,7 +111,7 @@ public class TestLoader extends TestHdfsBase { @Test public void testLoader() throws Exception { FileSystem fs = FileSystem.get(new Configuration()); - Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true)) + schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true)) .addColumn(new FloatingPoint("col2", 4L)) .addColumn(new Text("col3")); @@ -130,14 +141,22 @@ public class TestLoader extends TestHdfsBase { assertTestUser(user); return null; } - }, null, user); + }, schema, user); LinkConfiguration linkConf = new LinkConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); jobConf.toJobConfig.compression = compression; jobConf.toJobConfig.outputFormat = outputFormat; Path outputPath = new Path(outputDirectory); - loader.load(context, linkConf, jobConf); + try { + loader.load(context, linkConf, jobConf); + } catch (Exception e) { + // we may wait to fail if the compression format selected is not supported by the + // output format + Assert.assertTrue(compressionNotSupported()); + return; + } + Assert.assertEquals(1, fs.listStatus(outputPath).length); for (FileStatus status : fs.listStatus(outputPath)) { @@ -152,10 +171,26 @@ public class TestLoader extends TestHdfsBase { Assert.assertEquals(5, fs.listStatus(outputPath).length); } + private boolean compressionNotSupported() { + switch (outputFormat) { + case SEQUENCE_FILE: + return compression == ToCompression.GZIP; + case PARQUET_FILE: + return compression == ToCompression.BZIP2 || compression == ToCompression.DEFAULT; + } + return false; + } + @Test public void testOverrideNull() throws Exception { + // Parquet supports an actual "null" value so overriding null would not make + // sense here + if (outputFormat == PARQUET_FILE) { + return; + } + FileSystem fs = FileSystem.get(new Configuration()); - Schema schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true)) + schema = new Schema("schema").addColumn(new FixedPoint("col1", 8L, true)) .addColumn(new FloatingPoint("col2", 8L)) .addColumn(new Text("col3")) .addColumn(new Text("col4")); @@ -199,7 +234,15 @@ public class TestLoader extends TestHdfsBase { jobConf.toJobConfig.nullValue = "\\N"; Path outputPath = new Path(outputDirectory); - loader.load(context, linkConf, jobConf); + try { + loader.load(context, linkConf, jobConf); + } catch (Exception e) { + // we may wait to fail if the compression format selected is not supported by the + // output format + assert(compressionNotSupported()); + return; + } + Assert.assertEquals(1, fs.listStatus(outputPath).length); for (FileStatus status : fs.listStatus(outputPath)) { @@ -214,7 +257,7 @@ public class TestLoader extends TestHdfsBase { Assert.assertEquals(5, fs.listStatus(outputPath).length); } - private void verifyOutput(FileSystem fs, Path file, String format) throws IOException { + private void verifyOutput(FileSystem fs, Path file, String format) throws Exception { Configuration conf = new Configuration(); FSDataInputStream fsin = fs.open(file); CompressionCodec codec; @@ -228,7 +271,9 @@ public class TestLoader extends TestHdfsBase { case BZIP2: Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1); break; - + case GZIP: + Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1); + break; case DEFAULT: if(org.apache.hadoop.util.VersionInfo.getVersion().matches("\\b1\\.\\d\\.\\d")) { Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1); @@ -283,10 +328,46 @@ public class TestLoader extends TestHdfsBase { line = new org.apache.hadoop.io.Text(); } break; + case PARQUET_FILE: + String compressionCodecClassName = ParquetFileReader.readFooter(conf, file, ParquetMetadataConverter.NO_FILTER).getBlocks().get(0).getColumns().get(0).getCodec().getHadoopCompressionCodecClassName(); + + if (compressionCodecClassName == null) { + codec = null; + } else { + codec = (CompressionCodec) ClassUtils.loadClass(compressionCodecClassName).newInstance(); + } + + // Verify compression + switch(compression) { + case GZIP: + Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Gzip") != -1); + break; + + case NONE: + default: + Assert.assertNull(codec); + break; + } + + + ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build(); + AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat(); + avroIntermediateDataFormat.setSchema(schema); + GenericRecord record; + index = 1; + while ((record = avroParquetReader.read()) != null) { + List<Object> objects = new ArrayList<>(); + for (int i = 0; i < record.getSchema().getFields().size(); i++) { + objects.add(record.get(i)); + } + Assert.assertEquals(SqoopIDFUtils.toText(avroIntermediateDataFormat.toCSV(record)), formatRow(format, index++)); + } + + break; } } - private void verifyOutput(FileSystem fs, Path file) throws IOException { + private void verifyOutput(FileSystem fs, Path file) throws Exception { verifyOutput(fs, file, "%d,%f,%s"); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java index 985149c..89bc0f2 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java @@ -43,7 +43,8 @@ public class SqoopAvroUtils { * Creates an Avro schema from a Sqoop schema. */ public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) { - String name = sqoopSchema.getName(); + // avro schema names cannot start with quotes, lets just remove them + String name = sqoopSchema.getName().replace("\"", ""); String doc = sqoopSchema.getNote(); String namespace = SQOOP_SCHEMA_NAMESPACE; Schema schema = Schema.createRecord(name, doc, namespace, false); http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java index ace1bdf..e409fc1 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java @@ -148,7 +148,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe return jars; } - private GenericRecord toAVRO(String csv) { + public GenericRecord toAVRO(String csv) { String[] csvStringArray = parseCSVString(csv); @@ -175,7 +175,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe return avroObject; } - private Object toAVRO(String csvString, Column column) { + public Object toAVRO(String csvString, Column column) { Object returnValue = null; switch (column.getType()) { @@ -232,7 +232,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe return returnValue; } - private GenericRecord toAVRO(Object[] objectArray) { + public GenericRecord toAVRO(Object[] objectArray) { if (objectArray == null) { return null; @@ -311,7 +311,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe } @SuppressWarnings("unchecked") - private String toCSV(GenericRecord record) { + public String toCSV(GenericRecord record) { Column[] columns = this.schema.getColumnsArray(); StringBuilder csvString = new StringBuilder(); @@ -387,7 +387,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe } @SuppressWarnings("unchecked") - private Object[] toObject(GenericRecord record) { + public Object[] toObject(GenericRecord record) { if (record == null) { return null; @@ -459,4 +459,8 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe } return object; } + + public Schema getAvroSchema() { + return avroSchema; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index cb8a973..ba0a243 100644 --- a/pom.xml +++ b/pom.xml @@ -124,6 +124,7 @@ limitations under the License. <groovy.version>2.4.0</groovy.version> <jansi.version>1.7</jansi.version> <felix.version>2.4.0</felix.version> + <parquet.version>1.8.1</parquet.version> <!-- maven plugin versions --> <maven-assembly-plugin.version>2.6</maven-assembly-plugin.version> </properties> @@ -700,6 +701,16 @@ limitations under the License. <artifactId>jetty-servlet</artifactId> <version>${jetty.version}</version> </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index 451352a..134bca1 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -175,6 +175,16 @@ limitations under the License. <artifactId>hadoop-common</artifactId> </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + </dependency> + </dependencies> <!-- Add classifier name to the JAR name --> http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java index 3ec4f66..1e8c688 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java @@ -20,17 +20,27 @@ package org.apache.sqoop.integration.connector.hdfs; import com.google.common.collect.HashMultiset; import com.google.common.collect.Iterables; import com.google.common.collect.Multiset; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; import org.apache.sqoop.connector.common.SqoopIDFUtils; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter; +import org.apache.sqoop.connector.idf.AVROIntermediateDataFormat; import org.apache.sqoop.model.MDriverConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.DateTime; +import org.apache.sqoop.schema.type.FixedPoint; import org.apache.sqoop.test.asserts.HdfsAsserts; import org.apache.sqoop.test.infrastructure.Infrastructure; import org.apache.sqoop.test.infrastructure.SqoopTestCase; @@ -51,6 +61,7 @@ import org.testng.annotations.Test; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; @Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) @@ -64,6 +75,9 @@ public class NullValueTest extends SqoopTestCase { // The custom nullValue to use (set to null if default) private String nullValue; + + private Schema sqoopSchema; + @DataProvider(name="nul-value-test") public static Object[][] data(ITestContext context) { String customNullValue = "^&*custom!@"; @@ -80,12 +94,19 @@ public class NullValueTest extends SqoopTestCase { } @Override + public String getTestName() { return methodName + "[" + format.name() + ", " + nullValue + "]"; } @BeforeMethod public void setup() throws Exception { + sqoopSchema = new Schema("cities"); + sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true)); + sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("country")); + sqoopSchema.addColumn(new DateTime("some_date", true, false)); + sqoopSchema.addColumn(new org.apache.sqoop.schema.type.Text("city")); + createTableCities(); } @@ -128,6 +149,27 @@ public class NullValueTest extends SqoopTestCase { } sequenceFileWriter.close(); break; + case PARQUET_FILE: + // Parquet file format does not support using custom null values + if (usingCustomNullValue()) { + return; + } else { + HdfsParquetWriter parquetWriter = new HdfsParquetWriter(); + + Configuration conf = new Configuration(); + FileSystem.setDefaultUri(conf, hdfsClient.getUri()); + + parquetWriter.initialize( + new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")), + sqoopSchema, conf, null); + + for (String line : getCsv()) { + parquetWriter.write(line); + } + + parquetWriter.destroy(); + break; + } default: Assert.fail(); } @@ -166,6 +208,11 @@ public class NullValueTest extends SqoopTestCase { @Test public void testToHdfs() throws Exception { + // Parquet file format does not support using custom null values + if (usingCustomNullValue() && format == ToFormat.PARQUET_FILE) { + return; + } + provider.insertRow(getTableName(), 1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco"); provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null); provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); @@ -203,16 +250,16 @@ public class NullValueTest extends SqoopTestCase { executeJob(job); + + Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv())); + Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO")); + List<String> notFound = new ArrayList<>(); switch (format) { case TEXT_FILE: HdfsAsserts.assertMapreduceOutput(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv()); - break; + return; case SEQUENCE_FILE: - Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv())); - List<String> notFound = new ArrayList<>(); - Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO")); - for(Path file : files) { SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file); SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath); @@ -224,17 +271,32 @@ public class NullValueTest extends SqoopTestCase { } } } - if(!setLines.isEmpty() || !notFound.isEmpty()) { - LOG.error("Output do not match expectations."); - LOG.error("Expected lines that weren't present in the files:"); - LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'"); - LOG.error("Extra lines in files that weren't expected:"); - LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'"); - Assert.fail("Output do not match expectations."); + break; + case PARQUET_FILE: + AVROIntermediateDataFormat avroIntermediateDataFormat = new AVROIntermediateDataFormat(sqoopSchema); + notFound = new LinkedList<>(); + for (Path file : files) { + ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build(); + GenericRecord record; + while ((record = avroParquetReader.read()) != null) { + String recordAsCsv = avroIntermediateDataFormat.toCSV(record); + if (!setLines.remove(recordAsCsv)) { + notFound.add(recordAsCsv); + } + } } break; default: Assert.fail(); } + + if(!setLines.isEmpty() || !notFound.isEmpty()) { + LOG.error("Output do not match expectations."); + LOG.error("Expected lines that weren't present in the files:"); + LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'"); + LOG.error("Extra lines in files that weren't expected:"); + LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'"); + Assert.fail("Output do not match expectations."); + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/55d1db2b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java new file mode 100644 index 0000000..222c493 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/ParquetTest.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.integration.connector.hdfs; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.sqoop.connector.hdfs.configuration.ToFormat; +import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsParquetWriter; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.schema.type.DateTime; +import org.apache.sqoop.schema.type.FixedPoint; +import org.apache.sqoop.schema.type.Text; +import org.apache.sqoop.test.infrastructure.Infrastructure; +import org.apache.sqoop.test.infrastructure.SqoopTestCase; +import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider; +import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider; +import org.apache.sqoop.test.utils.HdfsUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class}) +public class ParquetTest extends SqoopTestCase { + + @AfterMethod + public void dropTable() { + super.dropTable(); + } + + @Test + public void toParquetTest() throws Exception { + createAndLoadTableCities(); + + // RDBMS link + MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsConnection); + saveLink(rdbmsConnection); + + // HDFS link + MLink hdfsConnection = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsConnection); + saveLink(hdfsConnection); + + // Job creation + MJob job = getClient().createJob(rdbmsConnection.getName(), hdfsConnection.getName()); + + + // Set rdbms "FROM" config + fillRdbmsFromConfig(job, "id"); + + // Fill the hdfs "TO" config + fillHdfsToConfig(job, ToFormat.PARQUET_FILE); + + saveJob(job); + executeJob(job); + + String[] expectedOutput = + {"'1','USA','2004-10-23 00:00:00.000','San Francisco'", + "'2','USA','2004-10-24 00:00:00.000','Sunnyvale'", + "'3','Czech Republic','2004-10-25 00:00:00.000','Brno'", + "'4','USA','2004-10-26 00:00:00.000','Palo Alto'"}; + + + Multiset<String> setLines = HashMultiset.create(Arrays.asList(expectedOutput)); + + List<String> notFound = new LinkedList<>(); + + Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, getMapreduceDirectory()); + for (Path file : files) { + ParquetReader<GenericRecord> avroParquetReader = AvroParquetReader.builder(file).build(); + GenericRecord record; + while ((record = avroParquetReader.read()) != null) { + String recordAsLine = recordToLine(record); + if (!setLines.remove(recordAsLine)) { + notFound.add(recordAsLine); + } + } + } + + if (!setLines.isEmpty() || !notFound.isEmpty()) { + fail("Output do not match expectations."); + } + } + + @Test + public void fromParquetTest() throws Exception { + createTableCities(); + + Schema sqoopSchema = new Schema("cities"); + sqoopSchema.addColumn(new FixedPoint("id", Long.valueOf(Integer.SIZE), true)); + sqoopSchema.addColumn(new Text("country")); + sqoopSchema.addColumn(new DateTime("some_date", true, false)); + sqoopSchema.addColumn(new Text("city")); + + HdfsParquetWriter parquetWriter = new HdfsParquetWriter(); + + Configuration conf = new Configuration(); + FileSystem.setDefaultUri(conf, hdfsClient.getUri()); + + parquetWriter.initialize( + new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001.parquet")), + sqoopSchema, conf, null); + + parquetWriter.write("1,'USA','2004-10-23 00:00:00.000','San Francisco'"); + parquetWriter.write("2,'USA','2004-10-24 00:00:00.000','Sunnyvale'"); + + parquetWriter.destroy(); + + parquetWriter.initialize( + new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0002.parquet")), + sqoopSchema, conf, null); + + parquetWriter.write("3,'Czech Republic','2004-10-25 00:00:00.000','Brno'"); + parquetWriter.write("4,'USA','2004-10-26 00:00:00.000','Palo Alto'"); + + parquetWriter.destroy(); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + fillHdfsLink(hdfsLink); + saveLink(hdfsLink); + + // Job creation + MJob job = getClient().createJob(hdfsLink.getName(), rdbmsLink.getName()); + fillHdfsFromConfig(job); + fillRdbmsToConfig(job); + saveJob(job); + + executeJob(job); + assertEquals(provider.rowCount(getTableName()), 4); + assertRowInCities(1, "USA", Timestamp.valueOf("2004-10-23 00:00:00.000"), "San Francisco"); + assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), "Sunnyvale"); + assertRowInCities(3, "Czech Republic", Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno"); + assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto"); + } + + public String recordToLine(GenericRecord genericRecord) { + String line = ""; + line += "\'" + String.valueOf(genericRecord.get(0)) + "\',"; + line += "\'" + String.valueOf(genericRecord.get(1)) + "\',"; + line += "\'" + new Timestamp((Long)genericRecord.get(2)) + "00\',"; + line += "\'" + String.valueOf(genericRecord.get(3)) + "\'"; + return line; + } + +}
