http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java new file mode 100644 index 0000000..eb80121 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.hdfsWriter; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; + + +import java.io.IOException; + +public class HdfsSequenceWriter extends GenericHdfsWriter { + + private SequenceFile.Writer filewriter; + private Text text; + + public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { + if (codec != null) { + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, + SequenceFile.CompressionType.BLOCK, codec); + } else { + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE); + } + + text = new Text(); + } + + @Override + public void write(String csv) throws IOException { + text.set(csv); + filewriter.append(text, NullWritable.get()); + } + + public void destroy() throws IOException { + filewriter.close(); + } +}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java new file mode 100644 index 0000000..78cf973 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs.hdfsWriter; + +import com.google.common.base.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.connector.hdfs.HdfsConstants; + +import java.io.BufferedWriter; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; + +public class HdfsTextWriter extends GenericHdfsWriter { + + private BufferedWriter filewriter; + + @Override + public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException { + FileSystem fs = filepath.getFileSystem(conf); + + DataOutputStream filestream = fs.create(filepath, false); + if (codec != null) { + filewriter = new BufferedWriter(new OutputStreamWriter( + codec.createOutputStream(filestream, codec.createCompressor()), + Charsets.UTF_8)); + } else { + filewriter = new BufferedWriter(new OutputStreamWriter( + filestream, Charsets.UTF_8)); + } + } + + @Override + public void write(String csv) throws IOException { + filewriter.write(csv + HdfsConstants.DEFAULT_RECORD_DELIMITER); + + } + + @Override + public void destroy() throws IOException { + filewriter.close(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties new file mode 100644 index 0000000..3125911 --- /dev/null +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic HDFS Connector Resources + +############################ +# Connection Form +# +connection.label = Connection configuration +connection.help = You must supply the information requested in order to \ + create a connection object. + +connection.dummy.label = Dummy parameter needed to get HDFS connector to register +connection.dummy.help = You can write anything here. Doesn't matter. + +# Output From +# +output.label = Output configuration +output.help = You must supply the information requested in order to \ + get information where you want to store your data. + +output.storageType.label = Storage type +output.storageType.help = Target on Hadoop ecosystem where to store data + +output.outputFormat.label = Output format +output.outputFormat.help = Format in which data should be serialized + +output.compression.label = Compression format +output.compression.help = Compression that should be used for the data + +output.customCompression.label = Custom compression format +output.customCompression.help = Full class name of the custom compression + +output.outputDirectory.label = Output directory +output.outputDirectory.help = Output directory for final data + +output.ignored.label = Ignored +output.ignored.help = This value is ignored + +# Input Form +# +input.label = Input configuration +input.help = Specifies information required to get data from Hadoop ecosystem + +input.inputDirectory.label = Input directory +input.inputDirectory.help = Directory that should be exported http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/resources/sqoopconnector.properties ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/resources/sqoopconnector.properties b/connector/connector-hdfs/src/main/resources/sqoopconnector.properties new file mode 100644 index 0000000..fa4e5e1 --- /dev/null +++ b/connector/connector-hdfs/src/main/resources/sqoopconnector.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generic HDFS Connector Properties +org.apache.sqoop.connector.class = org.apache.sqoop.connector.hdfs.HdfsConnector +org.apache.sqoop.connector.name = hdfs-connector \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java index 39d48c7..1e8ab52 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java @@ -180,6 +180,10 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { return null; } + if (schema == null) { + throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006); + } + if (fields.length != schema.getColumns().size()) { throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005, "The data " + getTextData() + " has the wrong number of fields."); @@ -189,7 +193,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> { Column[] cols = schema.getColumns().toArray(new Column[fields.length]); for (int i = 0; i < fields.length; i++) { Type colType = cols[i].getType(); - if (fields[i].equals("NULL")) { + //TODO: Replace with proper isNull method. Actually the entire content of the loop should be a parse method + if (fields[i].equals("NULL") || fields[i].equals("null") || fields[i].equals("'null'") || fields[i].isEmpty()) { out[i] = null; continue; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java index 9219074..4d41679 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java @@ -37,7 +37,9 @@ public enum IntermediateDataFormatError implements ErrorCode { INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."), /** Number of fields. */ - INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.") + INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields."), + + INTERMEDIATE_DATA_FORMAT_0006("Schema missing.") ; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/pom.xml ---------------------------------------------------------------------- diff --git a/connector/pom.xml b/connector/pom.xml index d642c3e..e98a0fc 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -35,10 +35,11 @@ limitations under the License. <modules> <module>connector-sdk</module> <module>connector-generic-jdbc</module> - <!-- Uncomment and finish connectors after sqoop framework will become stable - <module>connector-mysql-jdbc</module> - <module>connector-mysql-fastpath</module> - --> + <module>connector-hdfs</module> + <!-- Uncomment and finish connectors after sqoop framework will become stable + <module>connector-mysql-jdbc</module> + <module>connector-mysql-fastpath</module> + --> </modules> </project> http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java index f19a23e..46257f2 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java @@ -18,20 +18,13 @@ package org.apache.sqoop.framework; import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.InputForm; import org.apache.sqoop.framework.configuration.JobConfiguration; -import org.apache.sqoop.framework.configuration.OutputCompression; -import org.apache.sqoop.framework.configuration.OutputForm; import org.apache.sqoop.framework.configuration.ThrottlingForm; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.Validation; import org.apache.sqoop.validation.Validator; -/** - * Validate framework configuration objects - */ public class FrameworkValidator extends Validator { - @Override public Validation validateConnection(Object connectionConfiguration) { Validation validation = new Validation(ConnectionConfiguration.class); @@ -39,61 +32,16 @@ public class FrameworkValidator extends Validator { return validation; } - @Override public Validation validateJob(Object jobConfiguration) { - JobConfiguration configuration = (JobConfiguration)jobConfiguration; Validation validation = new Validation(JobConfiguration.class); - validateThrottingForm(validation, configuration.throttling); - return super.validateJob(jobConfiguration); - } - -// private Validation validateExportJob(Object jobConfiguration) { -// Validation validation = new Validation(ExportJobConfiguration.class); -// ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration; -// -// validateInputForm(validation, configuration.input); -// validateThrottingForm(validation, configuration.throttling); -// -// return validation; -// } -// -// private Validation validateImportJob(Object jobConfiguration) { -// Validation validation = new Validation(ImportJobConfiguration.class); -// ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration; -// -// validateOutputForm(validation, configuration.output); -// validateThrottingForm(validation, configuration.throttling); -// -// return validation; -// } + JobConfiguration conf = (JobConfiguration)jobConfiguration; + validateThrottlingForm(validation,conf.throttling); -// private void validateInputForm(Validation validation, InputForm input) { -// if(input.inputDirectory == null || input.inputDirectory.isEmpty()) { -// validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty"); -// } -// } -// -// private void validateOutputForm(Validation validation, OutputForm output) { -// if(output.outputDirectory == null || output.outputDirectory.isEmpty()) { -// validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty"); -// } -// if(output.customCompression != null && -// output.customCompression.trim().length() > 0 && -// output.compression != OutputCompression.CUSTOM) { -// validation.addMessage(Status.UNACCEPTABLE, "output", "compression", -// "custom compression should be blank as " + output.compression + " is being used."); -// } -// if(output.compression == OutputCompression.CUSTOM && -// (output.customCompression == null || -// output.customCompression.trim().length() == 0) -// ) { -// validation.addMessage(Status.UNACCEPTABLE, "output", "compression", -// "custom compression is blank."); -// } -// } + return validation; + }; - private void validateThrottingForm(Validation validation, ThrottlingForm throttling) { + private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) { if(throttling.extractors != null && throttling.extractors < 1) { validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor"); } @@ -102,4 +50,5 @@ public class FrameworkValidator extends Validator { validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader"); } } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java index 5571928..b1b37f6 100644 --- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java @@ -36,6 +36,7 @@ import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.repository.Repository; import org.apache.sqoop.repository.RepositoryManager; +import org.apache.sqoop.schema.Schema; import org.apache.sqoop.submission.SubmissionStatus; import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.utils.ClassUtils; @@ -434,12 +435,17 @@ public class JobManager implements Reconfigurable { request.getConnectorJobConfig(Direction.FROM))); // @TODO(Abe): Alter behavior of Schema here. Need from Schema. - // Retrieve and persist the schema - request.getSummary().setConnectorSchema(initializer.getSchema( - initializerContext, - request.getConnectorConnectionConfig(Direction.FROM), - request.getConnectorJobConfig(Direction.FROM) - )); + + + Schema fromSchema = initializer.getSchema(initializerContext, + request.getConnectorConnectionConfig(Direction.FROM), + request.getConnectorJobConfig(Direction.FROM)); + + // request.getSummary().setConnectorSchema(initializer.getSchema( + // initializerContext, + // request.getConnectorConnectionConfig(ConnectorType.FROM), + // request.getConnectorJobConfig(ConnectorType.FROM) + // )); // Initialize To Connector callback. baseCallback = request.getToCallback(); @@ -468,6 +474,11 @@ public class JobManager implements Reconfigurable { request.getConnectorJobConfig(Direction.TO))); // @TODO(Abe): Alter behavior of Schema here. Need To Schema. + + Schema toSchema = initializer.getSchema(initializerContext, + request.getConnectorConnectionConfig(Direction.TO), + request.getConnectorJobConfig(Direction.TO)); + // Retrieve and persist the schema // request.getSummary().setConnectorSchema(initializer.getSchema( // initializerContext, @@ -475,6 +486,12 @@ public class JobManager implements Reconfigurable { // request.getConnectorJobConfig(ConnectorType.TO) // )); + //TODO: Need better logic here + if (fromSchema != null) + request.getSummary().setConnectorSchema(fromSchema); + else + request.getSummary().setConnectorSchema(toSchema); + // Bootstrap job from framework perspective prepareSubmission(request); http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java index be6099e..bf3f785 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java @@ -176,9 +176,11 @@ public class SubmissionRequest { switch(type) { case FROM: fromConnector = connector; + break; case TO: toConnector = connector; + break; default: throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); @@ -238,10 +240,10 @@ public class SubmissionRequest { switch(type) { case FROM: fromConnectorConnectionConfig = config; - + break; case TO: toConnectorConnectionConfig = config; - + break; default: throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); } @@ -264,10 +266,10 @@ public class SubmissionRequest { switch(type) { case FROM: fromConnectorJobConfig = config; - + break; case TO: toConnectorJobConfig = config; - + break; default: throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); } @@ -290,10 +292,10 @@ public class SubmissionRequest { switch(type) { case FROM: fromFrameworkConnectionConfig = config; - + break; case TO: toFrameworkConnectionConfig = config; - + break; default: throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java deleted file mode 100644 index 6665429..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Form; - -/** - * - */ -@ConfigurationClass -public class ExportJobConfiguration { - - @Form public InputForm input; - - @Form public ThrottlingForm throttling; - - public ExportJobConfiguration() { - input = new InputForm(); - throttling = new ThrottlingForm(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java deleted file mode 100644 index 2a35eb9..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Form; - -/** - * - */ -@ConfigurationClass -public class ImportJobConfiguration { - - @Form public OutputForm output; - - @Form public ThrottlingForm throttling; - - public ImportJobConfiguration() { - output = new OutputForm(); - throttling = new ThrottlingForm(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java deleted file mode 100644 index d5cbeec..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -/** - * - */ -@FormClass -public class InputForm { - - @Input(size = 255) public String inputDirectory; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java index 7c653bf..0abc611 100644 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java @@ -22,8 +22,8 @@ import org.apache.sqoop.model.Form; @ConfigurationClass public class JobConfiguration { - - @Form public ThrottlingForm throttling; + @Form + public ThrottlingForm throttling; public JobConfiguration() { throttling = new ThrottlingForm(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java deleted file mode 100644 index 6cac46d..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -/** - * Supported compressions - */ -public enum OutputCompression { - NONE, - DEFAULT, - DEFLATE, - GZIP, - BZIP2, - LZO, - LZ4, - SNAPPY, - CUSTOM, -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java deleted file mode 100644 index b2cdb44..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -import org.apache.sqoop.model.FormClass; -import org.apache.sqoop.model.Input; - -/** - * - */ -@FormClass -public class OutputForm { - - @Input public StorageType storageType; - - @Input public OutputFormat outputFormat; - - @Input public OutputCompression compression; - - @Input(size = 255) public String customCompression; - - @Input(size = 255) public String outputDirectory; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java deleted file mode 100644 index 4cd3589..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -/** - * Various supported formats on disk - */ -public enum OutputFormat { - /** - * Comma separated text file - */ - TEXT_FILE, - - /** - * Sequence file - */ - SEQUENCE_FILE, -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java b/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java deleted file mode 100644 index dbe9f95..0000000 --- a/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.framework.configuration; - -/** - * Various storage types that Sqoop is supporting - */ -public enum StorageType { - /** - * Direct HDFS import - */ - HDFS, -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java index f875ceb..90395ac 100644 --- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java +++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java @@ -17,15 +17,7 @@ */ package org.apache.sqoop.framework; -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; -import org.apache.sqoop.framework.configuration.OutputCompression; -import org.apache.sqoop.model.MJob; -import org.apache.sqoop.validation.Status; -import org.apache.sqoop.validation.Validation; -import org.junit.Before; -import org.junit.Test; +//import org.apache.sqoop.framework.configuration.OutputCompression; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java index 31df04c..f19e01c 100644 --- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java +++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java @@ -28,7 +28,7 @@ import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.MetadataUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.framework.FrameworkManager; -import org.apache.sqoop.framework.configuration.ImportJobConfiguration; +//import org.apache.sqoop.framework.configuration.ImportJobConfiguration; import org.apache.sqoop.model.ConfigurationClass; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MConnection; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index ff328cb..b05954b 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -80,49 +80,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine { if(request.getExtractors() != null) { context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); } - - // @TODO(Abe): Move to HDFS connector. -// if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) { -// context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); -// } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) { -// context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); -// } else { -// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024, -// "Format: " + jobConf.output.outputFormat); -// } -// if(getCompressionCodecName(jobConf) != null) { -// context.setString(JobConstants.HADOOP_COMPRESS_CODEC, -// getCompressionCodecName(jobConf)); -// context.setBoolean(JobConstants.HADOOP_COMPRESS, true); -// } } - // @TODO(Abe): Move to HDFS connector. -// private String getCompressionCodecName(ImportJobConfiguration jobConf) { -// if(jobConf.output.compression == null) -// return null; -// switch(jobConf.output.compression) { -// case NONE: -// return null; -// case DEFAULT: -// return "org.apache.hadoop.io.compress.DefaultCodec"; -// case DEFLATE: -// return "org.apache.hadoop.io.compress.DeflateCodec"; -// case GZIP: -// return "org.apache.hadoop.io.compress.GzipCodec"; -// case BZIP2: -// return "org.apache.hadoop.io.compress.BZip2Codec"; -// case LZO: -// return "com.hadoop.compression.lzo.LzoCodec"; -// case LZ4: -// return "org.apache.hadoop.io.compress.Lz4Codec"; -// case SNAPPY: -// return "org.apache.hadoop.io.compress.SnappyCodec"; -// case CUSTOM: -// return jobConf.output.customCompression.trim(); -// } -// return null; -// } + /** * Our execution engine have additional dependencies that needs to be available http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java deleted file mode 100644 index c3beed7..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job; - -import org.apache.hadoop.conf.Configuration; -import org.apache.sqoop.common.ImmutableContext; - -/** - * Implementation of immutable context that is based on Hadoop configuration - * object. Each context property is prefixed with special prefix and loaded - * directly. - */ -public class PrefixContext implements ImmutableContext { - - Configuration configuration; - String prefix; - - public PrefixContext(Configuration configuration, String prefix) { - this.configuration = configuration; - this.prefix = prefix; - } - - @Override - public String getString(String key) { - return configuration.get(prefix + key); - } - - @Override - public String getString(String key, String defaultValue) { - return configuration.get(prefix + key, defaultValue); - } - - @Override - public long getLong(String key, long defaultValue) { - return configuration.getLong(prefix + key, defaultValue); - } - - @Override - public int getInt(String key, int defaultValue) { - return configuration.getInt(prefix + key, defaultValue); - } - - @Override - public boolean getBoolean(String key, boolean defaultValue) { - return configuration.getBoolean(prefix + key, defaultValue); - } - - /* - * TODO: Use getter methods for retrieval instead of - * exposing configuration directly. - */ - public Configuration getConfiguration() { - return configuration; - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java deleted file mode 100644 index 27afd8c..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.util.LineReader; -import org.apache.log4j.Logger; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.etl.io.DataWriter; -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; - -/** - * Extract from HDFS. - * Default field delimiter of a record is comma. - */ -//public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> { -// -// public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class); -// -// private Configuration conf; -// private DataWriter dataWriter; -// private long rowRead = 0; -// -// @Override -// public void extract(ExtractorContext context, -// ConnectionConfiguration connectionConfiguration, -// ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) { -// -// conf = ((PrefixContext) context.getContext()).getConfiguration(); -// dataWriter = context.getDataWriter(); -// -// try { -// HdfsExportPartition p = partition; -// LOG.info("Working on partition: " + p); -// int numFiles = p.getNumberOfFiles(); -// for (int i = 0; i < numFiles; i++) { -// extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); -// } -// } catch (IOException e) { -// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); -// } -// } -// -// private void extractFile(Path file, long start, long length) -// throws IOException { -// long end = start + length; -// LOG.info("Extracting file " + file); -// LOG.info("\t from offset " + start); -// LOG.info("\t to offset " + end); -// LOG.info("\t of length " + length); -// if(isSequenceFile(file)) { -// extractSequenceFile(file, start, length); -// } else { -// extractTextFile(file, start, length); -// } -// } -// -// /** -// * Extracts Sequence file -// * @param file -// * @param start -// * @param length -// * @throws IOException -// */ -// private void extractSequenceFile(Path file, long start, long length) -// throws IOException { -// LOG.info("Extracting sequence file"); -// long end = start + length; -// SequenceFile.Reader filereader = new SequenceFile.Reader( -// file.getFileSystem(conf), file, conf); -// -// if (start > filereader.getPosition()) { -// filereader.sync(start); // sync to start -// } -// -// Text line = new Text(); -// boolean hasNext = filereader.next(line); -// while (hasNext) { -// rowRead++; -// dataWriter.writeStringRecord(line.toString()); -// line = new Text(); -// hasNext = filereader.next(line); -// if (filereader.getPosition() >= end && filereader.syncSeen()) { -// break; -// } -// } -// filereader.close(); -// } -// -// /** -// * Extracts Text file -// * @param file -// * @param start -// * @param length -// * @throws IOException -// */ -// private void extractTextFile(Path file, long start, long length) -// throws IOException { -// LOG.info("Extracting text file"); -// long end = start + length; -// FileSystem fs = file.getFileSystem(conf); -// FSDataInputStream filestream = fs.open(file); -// CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); -// LineReader filereader; -// Seekable fileseeker = filestream; -// -// // Hadoop 1.0 does not have support for custom record delimiter and thus -// // we -// // are supporting only default one. -// // We might add another "else if" case for SplittableCompressionCodec once -// // we drop support for Hadoop 1.0. -// if (codec == null) { -// filestream.seek(start); -// filereader = new LineReader(filestream); -// } else { -// filereader = new LineReader(codec.createInputStream(filestream, -// codec.createDecompressor()), conf); -// fileseeker = filestream; -// } -// if (start != 0) { -// // always throw away first record because -// // one extra line is read in previous split -// start += filereader.readLine(new Text(), 0); -// } -// int size; -// LOG.info("Start position: " + String.valueOf(start)); -// long next = start; -// while (next <= end) { -// Text line = new Text(); -// size = filereader.readLine(line, Integer.MAX_VALUE); -// if (size == 0) { -// break; -// } -// if (codec == null) { -// next += size; -// } else { -// next = fileseeker.getPos(); -// } -// rowRead++; -// dataWriter.writeStringRecord(line.toString()); -// } -// LOG.info("Extracting ended on position: " + fileseeker.getPos()); -// filestream.close(); -// } -// -// @Override -// public long getRowsRead() { -// return rowRead; -// } -// -// /** -// * Returns true if given file is sequence -// * @param file -// * @return boolean -// */ -// private boolean isSequenceFile(Path file) { -// SequenceFile.Reader filereader = null; -// try { -// filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); -// filereader.close(); -// } catch (IOException e) { -// return false; -// } -// return true; -// } -//} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java deleted file mode 100644 index cdbdaa8..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.job.etl; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.fs.Path; - -/** - * This class derives mostly from CombineFileSplit of Hadoop, i.e. - * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit. - */ -public class HdfsExportPartition extends Partition { - - private long lenFiles; - private int numFiles; - private Path[] files; - private long[] offsets; - private long[] lengths; - private String[] locations; - - public HdfsExportPartition() {} - - public HdfsExportPartition(Path[] files, long[] offsets, - long[] lengths, String[] locations) { - for(long length : lengths) { - this.lenFiles += length; - } - this.numFiles = files.length; - this.files = files; - this.offsets = offsets; - this.lengths = lengths; - this.locations = locations; - } - - public long getLengthOfFiles() { - return lenFiles; - } - - public int getNumberOfFiles() { - return numFiles; - } - - public Path getFile(int i) { - return files[i]; - } - - public long getOffset(int i) { - return offsets[i]; - } - - public long getLength(int i) { - return lengths[i]; - } - - public String[] getLocations() { - return locations; - } - - @Override - public void readFields(DataInput in) throws IOException { - numFiles = in.readInt(); - - files = new Path[numFiles]; - for(int i=0; i<numFiles; i++) { - files[i] = new Path(in.readUTF()); - } - - offsets = new long[numFiles]; - for(int i=0; i<numFiles; i++) { - offsets[i] = in.readLong(); - } - - lengths = new long[numFiles]; - for(int i=0; i<numFiles; i++) { - lengths[i] = in.readLong(); - } - - for(long length : lengths) { - lenFiles += length; - } - - int numLocations = in.readInt(); - if (numLocations == 0) { - locations = null; - } else { - locations = new String[numLocations]; - for(int i=0; i<numLocations; i++) { - locations[i] = in.readUTF(); - } - } - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(numFiles); - - for(Path file : files) { - out.writeUTF(file.toString()); - } - - for(long offset : offsets) { - out.writeLong(offset); - } - - for(long length : lengths) { - out.writeLong(length); - } - - if (locations == null || locations.length == 0) { - out.writeInt(0); - } else { - out.writeInt(locations.length); - for(String location : locations) { - out.writeUTF(location); - } - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("{"); - boolean first = true; - for(int i = 0; i < files.length; i++) { - if(first) { - first = false; - } else { - sb.append(", "); - } - - sb.append(files[i]); - sb.append(" (offset=").append(offsets[i]); - sb.append(", end=").append(offsets[i] + lengths[i]); - sb.append(", length=").append(lengths[i]); - sb.append(")"); - } - sb.append("}"); - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java deleted file mode 100644 index b3590dc..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java +++ /dev/null @@ -1,552 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sqoop.job.etl; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.HashMap; -import java.util.Set; -import java.util.Iterator; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.net.NodeBase; -import org.apache.hadoop.net.NetworkTopology; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; - -/** - * This class derives mostly from CombineFileInputFormat of Hadoop, i.e. - * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat. - */ -public class HdfsExportPartitioner extends Partitioner { - - public static final String SPLIT_MINSIZE_PERNODE = - "mapreduce.input.fileinputformat.split.minsize.per.node"; - public static final String SPLIT_MINSIZE_PERRACK = - "mapreduce.input.fileinputformat.split.minsize.per.rack"; - - // ability to limit the size of a single split - private long maxSplitSize = 0; - private long minSplitSizeNode = 0; - private long minSplitSizeRack = 0; - - // mapping from a rack name to the set of Nodes in the rack - private HashMap<String, Set<String>> rackToNodes = - new HashMap<String, Set<String>>(); - - @Override - public List<Partition> getPartitions(PartitionerContext context, - Object connectionConfiguration, Object jobConfiguration) { - - Configuration conf = ((PrefixContext)context.getContext()).getConfiguration(); - - try { - long numInputBytes = getInputSize(conf); - maxSplitSize = numInputBytes / context.getMaxPartitions(); - - if(numInputBytes % context.getMaxPartitions() != 0 ) { - maxSplitSize += 1; - } - - long minSizeNode = 0; - long minSizeRack = 0; - long maxSize = 0; - - // the values specified by setxxxSplitSize() takes precedence over the - // values that might have been specified in the config - if (minSplitSizeNode != 0) { - minSizeNode = minSplitSizeNode; - } else { - minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); - } - if (minSplitSizeRack != 0) { - minSizeRack = minSplitSizeRack; - } else { - minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); - } - if (maxSplitSize != 0) { - maxSize = maxSplitSize; - } else { - maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); - } - if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { - throw new IOException("Minimum split size pernode " + minSizeNode + - " cannot be larger than maximum split size " + - maxSize); - } - if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { - throw new IOException("Minimum split size per rack" + minSizeRack + - " cannot be larger than maximum split size " + - maxSize); - } - if (minSizeRack != 0 && minSizeNode > minSizeRack) { - throw new IOException("Minimum split size per node" + minSizeNode + - " cannot be smaller than minimum split " + - "size per rack " + minSizeRack); - } - - // all the files in input set - String indir = conf.get(JobConstants.HADOOP_INPUTDIR); - FileSystem fs = FileSystem.get(conf); - - List<Path> paths = new LinkedList<Path>(); - for(FileStatus status : fs.listStatus(new Path(indir))) { - if(!status.isDir()) { - paths.add(status.getPath()); - } - } - - List<Partition> partitions = new ArrayList<Partition>(); - if (paths.size() == 0) { - return partitions; - } - - // create splits for all files that are not in any pool. - getMoreSplits(conf, paths, - maxSize, minSizeNode, minSizeRack, partitions); - - // free up rackToNodes map - rackToNodes.clear(); - - return partitions; - - } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0021, e); - } - } - - private long getInputSize(Configuration conf) throws IOException { - String indir = conf.get(JobConstants.HADOOP_INPUTDIR); - FileSystem fs = FileSystem.get(conf); - FileStatus[] files = fs.listStatus(new Path(indir)); - long count = 0; - for (FileStatus file : files) { - count += file.getLen(); - } - return count; - } - - /** - * Return all the splits in the specified set of paths - */ - private void getMoreSplits(Configuration conf, List<Path> paths, - long maxSize, long minSizeNode, long minSizeRack, - List<Partition> partitions) throws IOException { - - // all blocks for all the files in input set - OneFileInfo[] files; - - // mapping from a rack name to the list of blocks it has - HashMap<String, List<OneBlockInfo>> rackToBlocks = - new HashMap<String, List<OneBlockInfo>>(); - - // mapping from a block to the nodes on which it has replicas - HashMap<OneBlockInfo, String[]> blockToNodes = - new HashMap<OneBlockInfo, String[]>(); - - // mapping from a node to the list of blocks that it contains - HashMap<String, List<OneBlockInfo>> nodeToBlocks = - new HashMap<String, List<OneBlockInfo>>(); - - files = new OneFileInfo[paths.size()]; - if (paths.size() == 0) { - return; - } - - // populate all the blocks for all files - for (int i = 0; i < paths.size(); i++) { - files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)), - rackToBlocks, blockToNodes, nodeToBlocks, - rackToNodes, maxSize); - } - - ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>(); - Set<String> nodes = new HashSet<String>(); - long curSplitSize = 0; - - // process all nodes and create splits that are local - // to a node. - for (Iterator<Map.Entry<String, - List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); - iter.hasNext();) { - - Map.Entry<String, List<OneBlockInfo>> one = iter.next(); - nodes.add(one.getKey()); - List<OneBlockInfo> blocksInNode = one.getValue(); - - // for each block, copy it into validBlocks. Delete it from - // blockToNodes so that the same block does not appear in - // two different splits. - for (OneBlockInfo oneblock : blocksInNode) { - if (blockToNodes.containsKey(oneblock)) { - validBlocks.add(oneblock); - blockToNodes.remove(oneblock); - curSplitSize += oneblock.length; - - // if the accumulated split size exceeds the maximum, then - // create this split. - if (maxSize != 0 && curSplitSize >= maxSize) { - // create an input split and add it to the splits array - addCreatedSplit(partitions, nodes, validBlocks); - curSplitSize = 0; - validBlocks.clear(); - } - } - } - // if there were any blocks left over and their combined size is - // larger than minSplitNode, then combine them into one split. - // Otherwise add them back to the unprocessed pool. It is likely - // that they will be combined with other blocks from the - // same rack later on. - if (minSizeNode != 0 && curSplitSize >= minSizeNode) { - // create an input split and add it to the splits array - addCreatedSplit(partitions, nodes, validBlocks); - } else { - for (OneBlockInfo oneblock : validBlocks) { - blockToNodes.put(oneblock, oneblock.hosts); - } - } - validBlocks.clear(); - nodes.clear(); - curSplitSize = 0; - } - - // if blocks in a rack are below the specified minimum size, then keep them - // in 'overflow'. After the processing of all racks is complete, these - // overflow blocks will be combined into splits. - ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>(); - Set<String> racks = new HashSet<String>(); - - // Process all racks over and over again until there is no more work to do. - while (blockToNodes.size() > 0) { - - // Create one split for this rack before moving over to the next rack. - // Come back to this rack after creating a single split for each of the - // remaining racks. - // Process one rack location at a time, Combine all possible blocks that - // reside on this rack as one split. (constrained by minimum and maximum - // split size). - - // iterate over all racks - for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = - rackToBlocks.entrySet().iterator(); iter.hasNext();) { - - Map.Entry<String, List<OneBlockInfo>> one = iter.next(); - racks.add(one.getKey()); - List<OneBlockInfo> blocks = one.getValue(); - - // for each block, copy it into validBlocks. Delete it from - // blockToNodes so that the same block does not appear in - // two different splits. - boolean createdSplit = false; - for (OneBlockInfo oneblock : blocks) { - if (blockToNodes.containsKey(oneblock)) { - validBlocks.add(oneblock); - blockToNodes.remove(oneblock); - curSplitSize += oneblock.length; - - // if the accumulated split size exceeds the maximum, then - // create this split. - if (maxSize != 0 && curSplitSize >= maxSize) { - // create an input split and add it to the splits array - addCreatedSplit(partitions, getHosts(racks), validBlocks); - createdSplit = true; - break; - } - } - } - - // if we created a split, then just go to the next rack - if (createdSplit) { - curSplitSize = 0; - validBlocks.clear(); - racks.clear(); - continue; - } - - if (!validBlocks.isEmpty()) { - if (minSizeRack != 0 && curSplitSize >= minSizeRack) { - // if there is a minimum size specified, then create a single split - // otherwise, store these blocks into overflow data structure - addCreatedSplit(partitions, getHosts(racks), validBlocks); - } else { - // There were a few blocks in this rack that - // remained to be processed. Keep them in 'overflow' block list. - // These will be combined later. - overflowBlocks.addAll(validBlocks); - } - } - curSplitSize = 0; - validBlocks.clear(); - racks.clear(); - } - } - - assert blockToNodes.isEmpty(); - assert curSplitSize == 0; - assert validBlocks.isEmpty(); - assert racks.isEmpty(); - - // Process all overflow blocks - for (OneBlockInfo oneblock : overflowBlocks) { - validBlocks.add(oneblock); - curSplitSize += oneblock.length; - - // This might cause an exiting rack location to be re-added, - // but it should be ok. - for (int i = 0; i < oneblock.racks.length; i++) { - racks.add(oneblock.racks[i]); - } - - // if the accumulated split size exceeds the maximum, then - // create this split. - if (maxSize != 0 && curSplitSize >= maxSize) { - // create an input split and add it to the splits array - addCreatedSplit(partitions, getHosts(racks), validBlocks); - curSplitSize = 0; - validBlocks.clear(); - racks.clear(); - } - } - - // Process any remaining blocks, if any. - if (!validBlocks.isEmpty()) { - addCreatedSplit(partitions, getHosts(racks), validBlocks); - } - } - - private boolean isSplitable(Configuration conf, Path file) { - final CompressionCodec codec = - new CompressionCodecFactory(conf).getCodec(file); - - // This method might be improved for SplittableCompression codec when we - // drop support for Hadoop 1.0 - return null == codec; - - } - - /** - * Create a single split from the list of blocks specified in validBlocks - * Add this new split into list. - */ - private void addCreatedSplit(List<Partition> partitions, - Collection<String> locations, - ArrayList<OneBlockInfo> validBlocks) { - // create an input split - Path[] files = new Path[validBlocks.size()]; - long[] offsets = new long[validBlocks.size()]; - long[] lengths = new long[validBlocks.size()]; - for (int i = 0; i < validBlocks.size(); i++) { - files[i] = validBlocks.get(i).onepath; - offsets[i] = validBlocks.get(i).offset; - lengths[i] = validBlocks.get(i).length; - } - - // add this split to the list that is returned - HdfsExportPartition partition = new HdfsExportPartition( - files, offsets, lengths, locations.toArray(new String[0])); - partitions.add(partition); - } - - private Set<String> getHosts(Set<String> racks) { - Set<String> hosts = new HashSet<String>(); - for (String rack : racks) { - if (rackToNodes.containsKey(rack)) { - hosts.addAll(rackToNodes.get(rack)); - } - } - return hosts; - } - - private static void addHostToRack(HashMap<String, Set<String>> rackToNodes, - String rack, String host) { - Set<String> hosts = rackToNodes.get(rack); - if (hosts == null) { - hosts = new HashSet<String>(); - rackToNodes.put(rack, hosts); - } - hosts.add(host); - } - - /** - * information about one file from the File System - */ - private static class OneFileInfo { - private long fileSize; // size of the file - private OneBlockInfo[] blocks; // all blocks in this file - - OneFileInfo(Path path, Configuration conf, - boolean isSplitable, - HashMap<String, List<OneBlockInfo>> rackToBlocks, - HashMap<OneBlockInfo, String[]> blockToNodes, - HashMap<String, List<OneBlockInfo>> nodeToBlocks, - HashMap<String, Set<String>> rackToNodes, - long maxSize) - throws IOException { - this.fileSize = 0; - - // get block locations from file system - FileSystem fs = path.getFileSystem(conf); - FileStatus stat = fs.getFileStatus(path); - BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, - stat.getLen()); - // create a list of all block and their locations - if (locations == null) { - blocks = new OneBlockInfo[0]; - } else { - if (!isSplitable) { - // if the file is not splitable, just create the one block with - // full file length - blocks = new OneBlockInfo[1]; - fileSize = stat.getLen(); - blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0] - .getHosts(), locations[0].getTopologyPaths()); - } else { - ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>( - locations.length); - for (int i = 0; i < locations.length; i++) { - fileSize += locations[i].getLength(); - - // each split can be a maximum of maxSize - long left = locations[i].getLength(); - long myOffset = locations[i].getOffset(); - long myLength = 0; - do { - if (maxSize == 0) { - myLength = left; - } else { - if (left > maxSize && left < 2 * maxSize) { - // if remainder is between max and 2*max - then - // instead of creating splits of size max, left-max we - // create splits of size left/2 and left/2. This is - // a heuristic to avoid creating really really small - // splits. - myLength = left / 2; - } else { - myLength = Math.min(maxSize, left); - } - } - OneBlockInfo oneblock = new OneBlockInfo(path, myOffset, - myLength, locations[i].getHosts(), locations[i] - .getTopologyPaths()); - left -= myLength; - myOffset += myLength; - - blocksList.add(oneblock); - } while (left > 0); - } - blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]); - } - - for (OneBlockInfo oneblock : blocks) { - // add this block to the block --> node locations map - blockToNodes.put(oneblock, oneblock.hosts); - - // For blocks that do not have host/rack information, - // assign to default rack. - String[] racks = null; - if (oneblock.hosts.length == 0) { - racks = new String[]{NetworkTopology.DEFAULT_RACK}; - } else { - racks = oneblock.racks; - } - - // add this block to the rack --> block map - for (int j = 0; j < racks.length; j++) { - String rack = racks[j]; - List<OneBlockInfo> blklist = rackToBlocks.get(rack); - if (blklist == null) { - blklist = new ArrayList<OneBlockInfo>(); - rackToBlocks.put(rack, blklist); - } - blklist.add(oneblock); - if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { - // Add this host to rackToNodes map - addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); - } - } - - // add this block to the node --> block map - for (int j = 0; j < oneblock.hosts.length; j++) { - String node = oneblock.hosts[j]; - List<OneBlockInfo> blklist = nodeToBlocks.get(node); - if (blklist == null) { - blklist = new ArrayList<OneBlockInfo>(); - nodeToBlocks.put(node, blklist); - } - blklist.add(oneblock); - } - } - } - } - - } - - /** - * information about one block from the File System - */ - private static class OneBlockInfo { - Path onepath; // name of this file - long offset; // offset in file - long length; // length of this block - String[] hosts; // nodes on which this block resides - String[] racks; // network topology of hosts - - OneBlockInfo(Path path, long offset, long len, - String[] hosts, String[] topologyPaths) { - this.onepath = path; - this.offset = offset; - this.hosts = hosts; - this.length = len; - assert (hosts.length == topologyPaths.length || - topologyPaths.length == 0); - - // if the file system does not have any rack information, then - // use dummy rack location. - if (topologyPaths.length == 0) { - topologyPaths = new String[hosts.length]; - for (int i = 0; i < topologyPaths.length; i++) { - topologyPaths[i] = (new NodeBase(hosts[i], - NetworkTopology.DEFAULT_RACK)).toString(); - } - } - - // The topology paths have the host name included as the last - // component. Strip it. - this.racks = new String[topologyPaths.length]; - for (int i = 0; i < topologyPaths.length; i++) { - this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java deleted file mode 100644 index d4ffb13..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.etl.io.DataReader; -import org.apache.sqoop.utils.ClassUtils; - -public class HdfsSequenceImportLoader extends Loader { - - public static final String EXTENSION = ".seq"; - - @Override - public void load(LoaderContext context, Object oc, Object oj) throws Exception { - DataReader reader = context.getDataReader(); - - Configuration conf = new Configuration(); -// Configuration conf = ((EtlContext)context).getConfiguration(); - String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE); - String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC); - - CompressionCodec codec = null; - if (codecname != null) { - Class<?> clz = ClassUtils.loadClass(codecname); - if (clz == null) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname); - } - - try { - codec = (CompressionCodec) clz.newInstance(); - if (codec instanceof Configurable) { - ((Configurable) codec).setConf(conf); - } - } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e); - } - } - - filename += EXTENSION; - - try { - Path filepath = new Path(filename); - SequenceFile.Writer filewriter; - if (codec != null) { - filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), - conf, filepath, Text.class, NullWritable.class, - CompressionType.BLOCK, codec); - } else { - filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), - conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); - } - - String csv; - Text text = new Text(); - while ((csv = reader.readTextRecord()) != null) { - text.set(csv); - filewriter.append(text, NullWritable.get()); - } - filewriter.close(); - - } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e); - } - - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java deleted file mode 100644 index 7b799ca..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -import java.io.BufferedWriter; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; - -import com.google.common.base.Charsets; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.etl.io.DataReader; -import org.apache.sqoop.utils.ClassUtils; - -public class HdfsTextImportLoader extends Loader { - - private final char recordDelimiter; - - public HdfsTextImportLoader() { - recordDelimiter = Data.DEFAULT_RECORD_DELIMITER; - } - - @Override - public void load(LoaderContext context, Object oc, Object oj) throws Exception{ - DataReader reader = context.getDataReader(); - - Configuration conf = new Configuration(); -// Configuration conf = ((EtlContext)context).getConfiguration(); - String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE); - String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC); - - CompressionCodec codec = null; - if (codecname != null) { - Class<?> clz = ClassUtils.loadClass(codecname); - if (clz == null) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname); - } - - try { - codec = (CompressionCodec) clz.newInstance(); - if (codec instanceof Configurable) { - ((Configurable) codec).setConf(conf); - } - } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e); - } - - filename += codec.getDefaultExtension(); - } - - try { - Path filepath = new Path(filename); - FileSystem fs = filepath.getFileSystem(conf); - - BufferedWriter filewriter; - DataOutputStream filestream = fs.create(filepath, false); - if (codec != null) { - filewriter = new BufferedWriter(new OutputStreamWriter( - codec.createOutputStream(filestream, codec.createCompressor()), - Charsets.UTF_8)); - } else { - filewriter = new BufferedWriter(new OutputStreamWriter( - filestream, Charsets.UTF_8)); - } - - String csv; - while ((csv = reader.readTextRecord()) != null) { - filewriter.write(csv + recordDelimiter); - } - filewriter.close(); - - } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e); - } - - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index 8e31ef5..59431f4 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.schema.Schema; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index e96909a..1c1133a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -33,7 +33,7 @@ import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 6e2cfbf..1d60ba3 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -31,7 +31,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.etl.io.DataWriter; @@ -66,7 +66,16 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, // Propagate connector schema in every case for now // TODO: Change to coditional choosing between Connector schemas. + Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); + if (schema==null) { + schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); + } + + if (schema==null) { + LOG.info("setting an empty schema"); + } + String intermediateDataFormatName = conf.get(JobConstants .INTERMEDIATE_DATA_FORMAT);
