Repository: sqoop Updated Branches: refs/heads/trunk 72c5cd717 -> 11c83f683
SQOOP-3318: Remove Kite dependency from test cases (Szabolcs Vasas via Boglarka Egyed) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/11c83f68 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/11c83f68 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/11c83f68 Branch: refs/heads/trunk Commit: 11c83f68386add243762929ecf7f6f25a99efbf4 Parents: 72c5cd7 Author: Boglarka Egyed <[email protected]> Authored: Fri Apr 27 10:38:10 2018 +0200 Committer: Boglarka Egyed <[email protected]> Committed: Fri Apr 27 10:38:10 2018 +0200 ---------------------------------------------------------------------- .../org/apache/sqoop/util/FileSystemUtil.java | 20 +++ src/test/org/apache/sqoop/TestAllTables.java | 20 +-- src/test/org/apache/sqoop/TestMerge.java | 27 +-- .../org/apache/sqoop/TestParquetExport.java | 65 ++++--- .../org/apache/sqoop/TestParquetImport.java | 174 ++++++++----------- .../org/apache/sqoop/hive/TestHiveImport.java | 70 ++++---- .../org/apache/sqoop/util/ParquetReader.java | 141 +++++++++++++++ 7 files changed, 320 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/11c83f68/src/java/org/apache/sqoop/util/FileSystemUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/util/FileSystemUtil.java b/src/java/org/apache/sqoop/util/FileSystemUtil.java index 1493e09..96ec212 100644 --- a/src/java/org/apache/sqoop/util/FileSystemUtil.java +++ b/src/java/org/apache/sqoop/util/FileSystemUtil.java @@ -19,8 +19,14 @@ package org.apache.sqoop.util; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; public final class FileSystemUtil { private FileSystemUtil() { @@ -42,4 +48,18 @@ public final class FileSystemUtil { return path.getFileSystem(conf).makeQualified(path); } + + public static boolean isFile(Path path, Configuration conf) throws IOException { + return path.getFileSystem(conf).isFile(path); + } + + public static List<Path> listFiles(Path path, Configuration conf) throws IOException { + List<Path> result = new ArrayList<>(); + FileSystem fileSystem = path.getFileSystem(conf); + RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(path, false); + while (files.hasNext()) { + result.add(files.next().getPath()); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/11c83f68/src/test/org/apache/sqoop/TestAllTables.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestAllTables.java b/src/test/org/apache/sqoop/TestAllTables.java index 56d1f57..16933a8 100644 --- a/src/test/org/apache/sqoop/TestAllTables.java +++ b/src/test/org/apache/sqoop/TestAllTables.java @@ -23,12 +23,12 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.sqoop.util.ParquetReader; import org.junit.Before; import org.junit.After; @@ -36,10 +36,8 @@ import org.apache.sqoop.testutil.CommonArgs; import org.apache.sqoop.testutil.ImportJobTestCase; import org.apache.sqoop.tool.ImportAllTablesTool; import org.junit.Test; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetReader; -import org.kitesdk.data.Datasets; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -180,7 +178,6 @@ public class TestAllTables extends ImportJobTestCase { int i = 0; for (String tableName : this.tableNames) { Path tablePath = new Path(warehousePath, tableName); - Dataset dataset = Datasets.load("dataset:file:" + tablePath); // dequeue the expected value for this table. This // list has the same order as the tableNames list. @@ -188,16 +185,9 @@ public class TestAllTables extends ImportJobTestCase { + this.expectedStrings.get(0); this.expectedStrings.remove(0); - DatasetReader<GenericRecord> reader = dataset.newReader(); - try { - GenericRecord record = reader.next(); - String line = record.get(0) + "," + record.get(1); - assertEquals("Table " + tableName + " expected a different string", - expectedVal, line); - assertFalse(reader.hasNext()); - } finally { - reader.close(); - } + List<String> result = new ParquetReader(tablePath).readAllInCsv(); + assertEquals("Table " + tableName + " expected a different string", + singletonList(expectedVal), result); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/11c83f68/src/test/org/apache/sqoop/TestMerge.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestMerge.java b/src/test/org/apache/sqoop/TestMerge.java index 8eef8d4..11806fe 100644 --- a/src/test/org/apache/sqoop/TestMerge.java +++ b/src/test/org/apache/sqoop/TestMerge.java @@ -48,13 +48,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.sqoop.util.ParquetReader; import org.junit.Before; import org.junit.Test; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetReader; -import org.kitesdk.data.Datasets; -import static org.apache.avro.generic.GenericData.Record; import static org.junit.Assert.fail; /** @@ -298,21 +295,11 @@ public class TestMerge extends BaseSqoopTestCase { return false; } - private boolean checkParquetFileForLine(FileSystem fileSystem, Path path, List<Integer> record) throws IOException - { - Dataset<Record> parquetRecords = Datasets.load("dataset:" + path.getParent(), Record.class); - DatasetReader<Record> datasetReader = null; - try { - datasetReader = parquetRecords.newReader(); - for (GenericRecord genericRecord : datasetReader) { - if (valueMatches(genericRecord, record)) { - return true; - } - } - } - finally { - if (datasetReader != null) { - datasetReader.close(); + private boolean checkParquetFileForLine(Path path, List<Integer> record) throws IOException { + List<GenericRecord> resultRecords = new ParquetReader(path.getParent()).readAll(); + for (GenericRecord resultRecord : resultRecords) { + if (valueMatches(resultRecord, record)) { + return true; } } @@ -330,7 +317,7 @@ public class TestMerge extends BaseSqoopTestCase { result = checkAvroFileForLine(fs, p, record); break; case ParquetFile: - result = checkParquetFileForLine(fs, p, record); + result = checkParquetFileForLine(p, record); break; } return result; http://git-wip-us.apache.org/repos/asf/sqoop/blob/11c83f68/src/test/org/apache/sqoop/TestParquetExport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestParquetExport.java b/src/test/org/apache/sqoop/TestParquetExport.java index c8bb663..43dabb5 100644 --- a/src/test/org/apache/sqoop/TestParquetExport.java +++ b/src/test/org/apache/sqoop/TestParquetExport.java @@ -18,6 +18,10 @@ package org.apache.sqoop; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.sqoop.testutil.ExportJobTestCase; import com.google.common.collect.Lists; import org.apache.avro.Schema; @@ -28,7 +32,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.kitesdk.data.*; +import parquet.avro.AvroParquetWriter; import java.io.IOException; import java.nio.ByteBuffer; @@ -39,9 +43,13 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; +import static parquet.hadoop.metadata.CompressionCodecName.SNAPPY; /** @@ -121,30 +129,45 @@ public class TestParquetExport extends ExportJobTestCase { /** * Create a data file that gets exported to the db. - * @param fileNum the number of the file (for multi-file export) + * Sqoop uses Kite to export Parquet files so it requires a Kite metadata directory to be present next to the files + * but since we do not use Kite in our test cases anymore we generate the .metadata directory here. * @param numRecords how many records to write to the file. */ - protected void createParquetFile(int fileNum, int numRecords, + protected void createParquetFile(int numRecords, ColumnGenerator... extraCols) throws IOException { - String uri = "dataset:file:" + getTablePath(); Schema schema = buildSchema(extraCols); - DatasetDescriptor descriptor = new DatasetDescriptor.Builder() - .schema(schema) - .format(Formats.PARQUET) - .build(); - Dataset dataset = Datasets.create(uri, descriptor); - DatasetWriter writer = dataset.newWriter(); - try { + + createMetadataDir(schema); + String fileName = UUID.randomUUID().toString() + ".parquet"; + Path filePath = new Path(getTablePath(), fileName); + try (AvroParquetWriter parquetWriter = new AvroParquetWriter(filePath, schema, SNAPPY, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE)) { for (int i = 0; i < numRecords; i++) { GenericRecord record = new GenericData.Record(schema); record.put("id", i); record.put("msg", getMsgPrefix() + i); addExtraColumns(record, i, extraCols); - writer.write(record); + parquetWriter.write(record); } - } finally { - writer.close(); + } + } + + private void createMetadataDir(Schema schema) throws IOException { + final String descriptorFileTemplate = "location=file\\:%s\n" + + " version=1\n" + + " compressionType=snappy\n" + + " format=parquet\n"; + Path metadataDirPath = new Path(getTablePath(), ".metadata"); + Path schemaFile = new Path(metadataDirPath, "schema.avsc"); + Path descriptorFile = new Path(metadataDirPath, "descriptor.properties"); + FileSystem fileSystem = getTablePath().getFileSystem(new Configuration()); + fileSystem.mkdirs(metadataDirPath); + + try (FSDataOutputStream fileOs = fileSystem.create(schemaFile)) { + fileOs.write(schema.toString().getBytes()); + } + try (FSDataOutputStream fileOs = fileSystem.create(descriptorFile)) { + fileOs.write(String.format(descriptorFileTemplate, getTablePath()).getBytes()); } } @@ -352,7 +375,7 @@ public class TestParquetExport extends ExportJobTestCase { colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration, "a", "VARCHAR(8)"), }; - createParquetFile(0, TOTAL_RECORDS, gens); + createParquetFile(TOTAL_RECORDS, gens); createTable(gens); runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); verifyExport(TOTAL_RECORDS); @@ -372,7 +395,7 @@ public class TestParquetExport extends ExportJobTestCase { Schema schema = Schema.createUnion(childSchemas); ColumnGenerator gen0 = colGenerator(null, schema, null, "VARCHAR(64)"); ColumnGenerator gen1 = colGenerator("s", schema, "s", "VARCHAR(64)"); - createParquetFile(0, TOTAL_RECORDS, gen0, gen1); + createParquetFile(TOTAL_RECORDS, gen0, gen1); createTable(gen0, gen1); runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); verifyExport(TOTAL_RECORDS); @@ -392,7 +415,7 @@ public class TestParquetExport extends ExportJobTestCase { record.put("myint", 100); // DB type is not used so can be anything: ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)"); - createParquetFile(0, TOTAL_RECORDS, gen); + createParquetFile(TOTAL_RECORDS, gen); createTable(gen); thrown.expect(Exception.class); @@ -409,7 +432,7 @@ public class TestParquetExport extends ExportJobTestCase { // the Parquet value will not be exported ColumnGenerator gen = colGenerator(100, Schema.create(Schema.Type.INT), null, null); - createParquetFile(0, TOTAL_RECORDS, gen); + createParquetFile(TOTAL_RECORDS, gen); createTable(gen); runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); verifyExport(TOTAL_RECORDS); @@ -419,7 +442,7 @@ public class TestParquetExport extends ExportJobTestCase { public void testParquetWithUpdateKey() throws IOException, SQLException { String[] argv = { "--update-key", "ID" }; final int TOTAL_RECORDS = 1; - createParquetFile(0, TOTAL_RECORDS); + createParquetFile(TOTAL_RECORDS); createTableWithInsert(); runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1))); verifyExport(getMsgPrefix() + "0"); @@ -432,7 +455,7 @@ public class TestParquetExport extends ExportJobTestCase { final int TOTAL_RECORDS = 2; // ColumnGenerator gen = colGenerator("100", // Schema.create(Schema.Type.STRING), null, "VARCHAR(64)"); - createParquetFile(0, TOTAL_RECORDS); + createParquetFile(TOTAL_RECORDS); createTableWithInsert(); thrown.expect(Exception.class); @@ -447,7 +470,7 @@ public class TestParquetExport extends ExportJobTestCase { // null Parquet schema means don't create an Parquet field ColumnGenerator gen = colGenerator(null, null, null, "VARCHAR(64)"); - createParquetFile(0, TOTAL_RECORDS, gen); + createParquetFile(TOTAL_RECORDS, gen); createTable(gen); thrown.expect(Exception.class); http://git-wip-us.apache.org/repos/asf/sqoop/blob/11c83f68/src/test/org/apache/sqoop/TestParquetImport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestParquetImport.java b/src/test/org/apache/sqoop/TestParquetImport.java index 379529a..0f9c7f3 100644 --- a/src/test/org/apache/sqoop/TestParquetImport.java +++ b/src/test/org/apache/sqoop/TestParquetImport.java @@ -18,6 +18,8 @@ package org.apache.sqoop; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.sqoop.testutil.CommonArgs; import org.apache.sqoop.testutil.HsqldbTestServer; import org.apache.sqoop.testutil.ImportJobTestCase; @@ -28,11 +30,12 @@ import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sqoop.util.ParquetReader; import org.junit.Test; -import org.kitesdk.data.CompressionType; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetReader; -import org.kitesdk.data.Datasets; +import parquet.format.CompressionCodec; +import parquet.hadoop.Footer; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.metadata.ParquetMetadata; import java.io.IOException; import java.nio.ByteBuffer; @@ -42,7 +45,6 @@ import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -119,10 +121,16 @@ public class TestParquetImport extends ImportJobTestCase { @Test public void testDeflateCompression() throws IOException { - runParquetImportTest("deflate"); + // The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified. + // See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName() + runParquetImportTest("deflate", "gzip"); } private void runParquetImportTest(String codec) throws IOException { + runParquetImportTest(codec, codec); + } + + private void runParquetImportTest(String codec, String expectedCodec) throws IOException { String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)", "VARBINARY(2)",}; String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", }; @@ -131,7 +139,7 @@ public class TestParquetImport extends ImportJobTestCase { String [] extraArgs = { "--compression-codec", codec}; runImport(getOutputArgv(true, extraArgs)); - assertEquals(CompressionType.forName(codec), getCompressionType()); + assertEquals(expectedCodec.toUpperCase(), getCompressionType()); Schema schema = getSchema(); assertEquals(Type.RECORD, schema.getType()); @@ -145,25 +153,21 @@ public class TestParquetImport extends ImportJobTestCase { checkField(fields.get(5), "DATA_COL5", Type.STRING); checkField(fields.get(6), "DATA_COL6", Type.BYTES); - DatasetReader<GenericRecord> reader = getReader(); - try { - GenericRecord record1 = reader.next(); - assertNotNull(record1); - assertEquals("DATA_COL0", true, record1.get("DATA_COL0")); - assertEquals("DATA_COL1", 100, record1.get("DATA_COL1")); - assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2")); - assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3")); - assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4")); - assertEquals("DATA_COL5", "s", record1.get("DATA_COL5")); - Object object = record1.get("DATA_COL6"); - assertTrue(object instanceof ByteBuffer); - ByteBuffer b = ((ByteBuffer) object); - assertEquals((byte) 1, b.get(0)); - assertEquals((byte) 2, b.get(1)); - assertFalse(reader.hasNext()); - } finally { - reader.close(); - } + List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll(); + GenericRecord record1 = genericRecords.get(0); + assertNotNull(record1); + assertEquals("DATA_COL0", true, record1.get("DATA_COL0")); + assertEquals("DATA_COL1", 100, record1.get("DATA_COL1")); + assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2")); + assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3")); + assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4")); + assertEquals("DATA_COL5", "s", record1.get("DATA_COL5")); + Object object = record1.get("DATA_COL6"); + assertTrue(object instanceof ByteBuffer); + ByteBuffer b = ((ByteBuffer) object); + assertEquals((byte) 1, b.get(0)); + assertEquals((byte) 2, b.get(1)); + assertEquals(1, genericRecords.size()); } @Test @@ -181,15 +185,10 @@ public class TestParquetImport extends ImportJobTestCase { assertEquals(types.length, fields.size()); checkField(fields.get(0), "DATA_COL0", Type.STRING); - DatasetReader<GenericRecord> reader = getReader(); - try { - assertTrue(reader.hasNext()); - GenericRecord record1 = reader.next(); - assertEquals("DATA_COL0", "10", record1.get("DATA_COL0")); - assertFalse(reader.hasNext()); - } finally { - reader.close(); - } + List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll(); + GenericRecord record1 = genericRecords.get(0); + assertEquals("DATA_COL0", "10", record1.get("DATA_COL0")); + assertEquals(1, genericRecords.size()); } @Test @@ -207,15 +206,10 @@ public class TestParquetImport extends ImportJobTestCase { assertEquals(types.length, fields.size()); checkField(fields.get(0), "__NAME", Type.INT); - DatasetReader<GenericRecord> reader = getReader(); - try { - assertTrue(reader.hasNext()); - GenericRecord record1 = reader.next(); - assertEquals("__NAME", 1987, record1.get("__NAME")); - assertFalse(reader.hasNext()); - } finally { - reader.close(); - } + List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll(); + GenericRecord record1 = genericRecords.get(0); + assertEquals("__NAME", 1987, record1.get("__NAME")); + assertEquals(1, genericRecords.size()); } @Test @@ -233,15 +227,10 @@ public class TestParquetImport extends ImportJobTestCase { assertEquals(types.length, fields.size()); checkField(fields.get(0), "TEST_P_A_R_QUET", Type.INT); - DatasetReader<GenericRecord> reader = getReader(); - try { - assertTrue(reader.hasNext()); - GenericRecord record1 = reader.next(); - assertEquals("TEST_P_A_R_QUET", 2015, record1.get("TEST_P_A_R_QUET")); - assertFalse(reader.hasNext()); - } finally { - reader.close(); - } + List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll(); + GenericRecord record1 = genericRecords.get(0); + assertEquals("TEST_P_A_R_QUET", 2015, record1.get("TEST_P_A_R_QUET")); + assertEquals(1, genericRecords.size()); } @Test @@ -252,15 +241,10 @@ public class TestParquetImport extends ImportJobTestCase { runImport(getOutputArgv(true, null)); - DatasetReader<GenericRecord> reader = getReader(); - try { - assertTrue(reader.hasNext()); - GenericRecord record1 = reader.next(); - assertNull(record1.get("DATA_COL0")); - assertFalse(reader.hasNext()); - } finally { - reader.close(); - } + List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll(); + GenericRecord record1 = genericRecords.get(0); + assertNull(record1.get("DATA_COL0")); + assertEquals(1, genericRecords.size()); } @Test @@ -271,15 +255,10 @@ public class TestParquetImport extends ImportJobTestCase { runImport(getOutputQueryArgv(true, null)); - DatasetReader<GenericRecord> reader = getReader(); - try { - assertTrue(reader.hasNext()); - GenericRecord record1 = reader.next(); - assertEquals(1, record1.get("DATA_COL0")); - assertFalse(reader.hasNext()); - } finally { - reader.close(); - } + List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll(); + GenericRecord record1 = genericRecords.get(0); + assertEquals(1, record1.get("DATA_COL0")); + assertEquals(1, genericRecords.size()); } @Test @@ -291,17 +270,12 @@ public class TestParquetImport extends ImportJobTestCase { runImport(getOutputArgv(true, null)); runImport(getOutputArgv(true, new String[]{"--append"})); - DatasetReader<GenericRecord> reader = getReader(); - try { - assertTrue(reader.hasNext()); - GenericRecord record1 = reader.next(); - assertEquals(1, record1.get("DATA_COL0")); - record1 = reader.next(); - assertEquals(1, record1.get("DATA_COL0")); - assertFalse(reader.hasNext()); - } finally { - reader.close(); - } + List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll(); + GenericRecord record1 = genericRecords.get(0); + assertEquals(1, record1.get("DATA_COL0")); + record1 = genericRecords.get(1); + assertEquals(1, record1.get("DATA_COL0")); + assertEquals(2, genericRecords.size()); } @Test @@ -319,30 +293,26 @@ public class TestParquetImport extends ImportJobTestCase { } } - private CompressionType getCompressionType() { - return getDataset().getDescriptor().getCompressionType(); - } - - private Schema getSchema() { - return getDataset().getDescriptor().getSchema(); - } - - private DatasetReader<GenericRecord> getReader() { - return getDataset().newReader(); + private String getCompressionType() { + ParquetMetadata parquetMetadata = getOutputMetadata(); + CompressionCodec parquetCompressionCodec = parquetMetadata.getBlocks().get(0).getColumns().get(0).getCodec().getParquetCompressionCodec(); + return parquetCompressionCodec.name(); } - private Dataset<GenericRecord> getDataset() { - String uri = "dataset:file:" + getTablePath(); - return Datasets.load(uri, GenericRecord.class); + private ParquetMetadata getOutputMetadata() { + try { + Configuration config = new Configuration(); + FileStatus fileStatus = getTablePath().getFileSystem(config).getFileStatus(getTablePath()); + List<Footer> footers = ParquetFileReader.readFooters(config, fileStatus, false); + return footers.get(0).getParquetMetadata(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - @Override - public void tearDown() { - super.tearDown(); - String uri = "dataset:file:" + getTablePath(); - if (Datasets.exists(uri)) { - Datasets.delete(uri); - } + private Schema getSchema() { + String schemaString = getOutputMetadata().getFileMetaData().getKeyValueMetaData().get("parquet.avro.schema"); + return new Schema.Parser().parse(schemaString); } private void checkField(Field field, String name, Type type) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/11c83f68/src/test/org/apache/sqoop/hive/TestHiveImport.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/hive/TestHiveImport.java b/src/test/org/apache/sqoop/hive/TestHiveImport.java index 4e1f249..bc19b69 100644 --- a/src/test/org/apache/sqoop/hive/TestHiveImport.java +++ b/src/test/org/apache/sqoop/hive/TestHiveImport.java @@ -23,14 +23,12 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import org.apache.sqoop.Sqoop; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; -import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.sqoop.avro.AvroSchemaMismatchException; import org.apache.sqoop.mapreduce.ParquetJob; +import org.apache.sqoop.util.ParquetReader; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -54,11 +53,8 @@ import org.apache.sqoop.tool.ImportTool; import org.apache.sqoop.tool.SqoopTool; import org.apache.commons.cli.ParseException; import org.junit.rules.ExpectedException; -import org.kitesdk.data.Dataset; -import org.kitesdk.data.DatasetReader; -import org.kitesdk.data.Datasets; -import org.kitesdk.data.Formats; +import static java.util.Collections.sort; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -301,43 +297,39 @@ public class TestHiveImport extends ImportJobTestCase { runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), new ImportTool()); - verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}}); + verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}}); } - private void verifyHiveDataset(String tableName, Object[][] valsArray) { - String datasetUri = String.format("dataset:hive:default/%s", - tableName.toLowerCase()); - assertTrue(Datasets.exists(datasetUri)); - Dataset dataset = Datasets.load(datasetUri); - assertFalse(dataset.isEmpty()); + private void verifyHiveDataset(Object[][] valsArray) { + List<String> expected = getExpectedLines(valsArray); + List<String> result = new ParquetReader(getTablePath()).readAllInCsv(); - DatasetReader<GenericRecord> reader = dataset.newReader(); - try { - List<String> expectations = new ArrayList<String>(); - if (valsArray != null) { - for (Object[] vals : valsArray) { - expectations.add(Arrays.toString(vals)); - } - } + sort(expected); + sort(result); + + assertEquals(expected, result); + } - while (reader.hasNext() && expectations.size() > 0) { - String actual = Arrays.toString( - convertGenericRecordToArray(reader.next())); - assertTrue("Expect record: " + actual, expectations.remove(actual)); + private List<String> getExpectedLines(Object[][] valsArray) { + List<String> expectations = new ArrayList<>(); + if (valsArray != null) { + for (Object[] vals : valsArray) { + expectations.add(toCsv(vals)); } - assertFalse(reader.hasNext()); - assertEquals(0, expectations.size()); - } finally { - reader.close(); } + return expectations; } - private static Object[] convertGenericRecordToArray(GenericRecord record) { - Object[] result = new Object[record.getSchema().getFields().size()]; - for (int i = 0; i < result.length; i++) { - result[i] = record.get(i); + private String toCsv(Object[] vals) { + StringBuilder result = new StringBuilder(); + + for (Object val : vals) { + result.append(val).append(","); } - return result; + + result.deleteCharAt(result.length() - 1); + + return result.toString(); } /** Test that table is created in hive with no data import. */ @@ -388,13 +380,13 @@ public class TestHiveImport extends ImportJobTestCase { ImportTool tool = new ImportTool(); runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool); - verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}}); + verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}}); String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" }; String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"}; runImportTest(TABLE_NAME, types, valsToOverwrite, "", getArgv(false, extraArgsForOverwrite), tool); - verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}}); + verifyHiveDataset(new Object[][] {{"test2", 24, "somestring2"}}); } @Test @@ -430,7 +422,7 @@ public class TestHiveImport extends ImportJobTestCase { .name(getColName(2)).type().nullable().stringType().noDefault() .endRecord(); String dataSetUri = "dataset:hive:/default/" + tableName; - ParquetJob.createDataset(dataSetSchema, Formats.PARQUET.getDefaultCompressionType(), dataSetUri); + ParquetJob.createDataset(dataSetSchema, ParquetJob.getCompressionType(new Configuration()), dataSetUri); } /** @@ -448,11 +440,11 @@ public class TestHiveImport extends ImportJobTestCase { ImportTool tool = new ImportTool(); runImportTest(TABLE_NAME, types, vals, "", args, tool); - verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}}); + verifyHiveDataset(new Object[][]{{"test", 42, "somestring"}}); String [] valsToAppend = { "'test2'", "4242", "'somestring2'" }; runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool); - verifyHiveDataset(TABLE_NAME, new Object[][] { + verifyHiveDataset(new Object[][] { {"test2", 4242, "somestring2"}, {"test", 42, "somestring"}}); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/11c83f68/src/test/org/apache/sqoop/util/ParquetReader.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/util/ParquetReader.java b/src/test/org/apache/sqoop/util/ParquetReader.java new file mode 100644 index 0000000..56e03a0 --- /dev/null +++ b/src/test/org/apache/sqoop/util/ParquetReader.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.util; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.avro.AvroParquetReader; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.List; + +import static org.apache.sqoop.util.FileSystemUtil.isFile; +import static org.apache.sqoop.util.FileSystemUtil.listFiles; + +public class ParquetReader implements AutoCloseable { + + private final Path pathToRead; + + private final Configuration configuration; + + private final Deque<Path> filesToRead; + + private parquet.hadoop.ParquetReader<GenericRecord> reader; + + public ParquetReader(Path pathToRead, Configuration configuration) { + this.pathToRead = pathToRead; + this.configuration = configuration; + this.filesToRead = new ArrayDeque<>(determineFilesToRead()); + initReader(filesToRead.removeFirst()); + } + + public ParquetReader(Path pathToRead) { + this(pathToRead, new Configuration()); + } + + public GenericRecord next() throws IOException { + GenericRecord result = reader.read(); + if (result != null) { + return result; + } + if (!filesToRead.isEmpty()) { + initReader(filesToRead.removeFirst()); + return next(); + } + + return null; + } + + public List<GenericRecord> readAll() { + List<GenericRecord> result = new ArrayList<>(); + + GenericRecord record; + try { + while ((record = next()) != null) { + result.add(record); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + close(); + } + + return result; + } + + public List<String> readAllInCsv() { + List<String> result = new ArrayList<>(); + + for (GenericRecord record : readAll()) { + result.add(convertToCsv(record)); + } + + return result; + } + + private String convertToCsv(GenericRecord record) { + StringBuilder result = new StringBuilder(); + for (int i = 0; i < record.getSchema().getFields().size(); i++) { + result.append(record.get(i)); + result.append(","); + } + result.deleteCharAt(result.length() - 1); + return result.toString(); + } + + private void initReader(Path file) { + try { + if (reader != null) { + reader.close(); + } + this.reader = AvroParquetReader.<GenericRecord>builder(file).build(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Collection<Path> determineFilesToRead() { + try { + if (isFile(pathToRead, configuration)) { + return Collections.singletonList(pathToRead); + } + + return listFiles(pathToRead, configuration); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +}
