Repository: sqoop Updated Branches: refs/heads/trunk f4f954301 -> 8e45d2b38
http://git-wip-us.apache.org/repos/asf/sqoop/blob/8e45d2b3/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java b/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java index 01ad150..dbda8b7 100644 --- a/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java +++ b/src/test/org/apache/sqoop/tool/TestBaseSqoopTool.java @@ -18,15 +18,21 @@ package org.apache.sqoop.tool; +import org.apache.commons.cli.CommandLine; import org.apache.sqoop.SqoopOptions; +import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.Mockito; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.HADOOP; +import static org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorImplementation.KITE; import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestBaseSqoopTool { @@ -35,11 +41,13 @@ public class TestBaseSqoopTool { private BaseSqoopTool testBaseSqoopTool; private SqoopOptions testSqoopOptions; + private CommandLine mockCommandLine; @Before public void setup() { testBaseSqoopTool = mock(BaseSqoopTool.class, Mockito.CALLS_REAL_METHODS); testSqoopOptions = new SqoopOptions(); + mockCommandLine = mock(CommandLine.class); } @Test @@ -69,4 +77,61 @@ public class TestBaseSqoopTool { testBaseSqoopTool.rethrowIfRequired(testSqoopOptions, expectedCauseException); } + @Test + public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromCommandLine() throws Exception { + ParquetJobConfiguratorImplementation expectedValue = HADOOP; + + when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(expectedValue.toString()); + + testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions); + + assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation()); + } + + @Test + public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromCommandLineCaseInsensitively() throws Exception { + String hadoopImplementationLowercase = "haDooP"; + + when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(hadoopImplementationLowercase); + + testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions); + + assertEquals(HADOOP, testSqoopOptions.getParquetConfiguratorImplementation()); + } + + @Test + public void testApplyCommonOptionsSetsParquetJobConfigurationImplementationFromConfiguration() throws Exception { + ParquetJobConfiguratorImplementation expectedValue = HADOOP; + testSqoopOptions.getConf().set("parquetjob.configurator.implementation", expectedValue.toString()); + + testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions); + + assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation()); + } + + @Test + public void testApplyCommonOptionsPrefersParquetJobConfigurationImplementationFromCommandLine() throws Exception { + ParquetJobConfiguratorImplementation expectedValue = HADOOP; + testSqoopOptions.getConf().set("parquetjob.configurator.implementation", "kite"); + when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn(expectedValue.toString()); + + testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions); + + assertEquals(expectedValue, testSqoopOptions.getParquetConfiguratorImplementation()); + } + + @Test + public void testApplyCommonOptionsThrowsWhenInvalidParquetJobConfigurationImplementationIsSet() throws Exception { + when(mockCommandLine.getOptionValue("parquet-configurator-implementation")).thenReturn("this_is_definitely_not_valid"); + + exception.expectMessage("Invalid Parquet job configurator implementation is set: this_is_definitely_not_valid. Supported values are: [KITE, HADOOP]"); + testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions); + } + + @Test + public void testApplyCommonOptionsDoesNotChangeDefaultParquetJobConfigurationImplementationWhenNothingIsSet() throws Exception { + testBaseSqoopTool.applyCommonOptions(mockCommandLine, testSqoopOptions); + + assertEquals(KITE, testSqoopOptions.getParquetConfiguratorImplementation()); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/8e45d2b3/src/test/org/apache/sqoop/util/ParquetReader.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/util/ParquetReader.java b/src/test/org/apache/sqoop/util/ParquetReader.java index 56e03a0..f1c2fe1 100644 --- a/src/test/org/apache/sqoop/util/ParquetReader.java +++ b/src/test/org/apache/sqoop/util/ParquetReader.java @@ -20,8 +20,16 @@ package org.apache.sqoop.util; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import parquet.avro.AvroParquetReader; +import parquet.hadoop.Footer; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.metadata.BlockMetaData; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.hadoop.util.HiddenFileFilter; import java.io.IOException; import java.util.ArrayDeque; @@ -29,8 +37,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Deque; +import java.util.Iterator; import java.util.List; +import static java.util.Arrays.asList; import static org.apache.sqoop.util.FileSystemUtil.isFile; import static org.apache.sqoop.util.FileSystemUtil.listFiles; @@ -95,6 +105,49 @@ public class ParquetReader implements AutoCloseable { return result; } + public List<String> readAllInCsvSorted() { + List<String> result = readAllInCsv(); + Collections.sort(result); + + return result; + } + + public CompressionCodecName getCodec() { + List<Footer> footers = getFooters(); + + Iterator<Footer> footersIterator = footers.iterator(); + if (footersIterator.hasNext()) { + Footer footer = footersIterator.next(); + + Iterator<BlockMetaData> blockMetaDataIterator = footer.getParquetMetadata().getBlocks().iterator(); + if (blockMetaDataIterator.hasNext()) { + BlockMetaData blockMetaData = blockMetaDataIterator.next(); + + Iterator<ColumnChunkMetaData> columnChunkMetaDataIterator = blockMetaData.getColumns().iterator(); + + if (columnChunkMetaDataIterator.hasNext()) { + ColumnChunkMetaData columnChunkMetaData = columnChunkMetaDataIterator.next(); + + return columnChunkMetaData.getCodec(); + } + } + } + + return null; + } + + private List<Footer> getFooters() { + final List<Footer> footers; + try { + FileSystem fs = pathToRead.getFileSystem(configuration); + List<FileStatus> statuses = asList(fs.listStatus(pathToRead, HiddenFileFilter.INSTANCE)); + footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, statuses, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + return footers; + } + private String convertToCsv(GenericRecord record) { StringBuilder result = new StringBuilder(); for (int i = 0; i < record.getSchema().getFields().size(); i++) {