This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/master by this push:
new 604ca8446b Redshift bulk loader minor tweaks, updates. #3281
new f7a357bbec Merge pull request #3319 from bamaer/3281
604ca8446b is described below
commit 604ca8446b221d554bd5c6b04ff1a6bd0d5e13e9
Author: Bart Maertens <[email protected]>
AuthorDate: Wed Oct 25 17:30:13 2023 +0200
Redshift bulk loader minor tweaks, updates. #3281
---
.../redshift/bulkloader/RedshiftBulkLoader.java | 32 ++++++++++++----------
.../bulkloader/RedshiftBulkLoaderMeta.java | 2 +-
2 files changed, 19 insertions(+), 15 deletions(-)
diff --git
a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java
b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java
index cca06cc230..ea0be4bbd4 100644
---
a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java
+++
b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java
@@ -146,7 +146,7 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
}
data.outputRowMeta = getInputRowMeta().clone();
- meta.getFields(data.outputRowMeta, getTransformName(), null, null, this,
metadataProvider);
+ meta.getFields(data.insertRowMeta, getTransformName(), null, null, this,
metadataProvider);
if(meta.isStreamToS3Csv()){
@@ -170,7 +170,7 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
for (int i = 0; i < meta.getFields().size(); i++) {
int streamFieldLocation =
- data.outputRowMeta.indexOfValue(
+ data.insertRowMeta.indexOfValue(
meta.getFields().get(i).getStreamField());
if (streamFieldLocation < 0) {
throw new HopTransformException(
@@ -208,10 +208,10 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
// Cache the position of the selected fields in the row array
data.selectedRowFieldIndices = new int[numberOfInsertFields];
- for(int i=0; i < data.dbFields.size(); i++){
+ for(int i=0; i < meta.getFields().size(); i++){
RedshiftBulkLoaderField vbf = meta.getFields().get(i);
String inputFieldName = vbf.getStreamField();
- int inputFieldIdx = getInputRowMeta().indexOfValue(inputFieldName);
+ int inputFieldIdx = i;
if (inputFieldIdx < 0) {
throw new HopTransformException(
BaseMessages.getString(
@@ -239,7 +239,7 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
}
if(meta.isStreamToS3Csv()){
- writeRowToFile(data.insertRowMeta, r);
+ writeRowToFile(data.outputRowMeta, r);
putRow(data.outputRowMeta, r);
}
@@ -287,12 +287,13 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
if(meta.isStreamToS3Csv() ||
meta.getLoadFromExistingFileFormat().equals("CSV")){
sb.append(" (");
- final IRowMeta fields = data.outputRowMeta;
- for (int i = 0; i < fields.size(); i++) {
- if (i > 0) {
- sb.append(", " + fields.getValueMeta(i).getName());
+ List<RedshiftBulkLoaderField> fieldList = meta.getFields();
+ for(int i=0; i < fieldList.size(); i++){
+ RedshiftBulkLoaderField field = fieldList.get(i);
+ if( i > 0){
+ sb.append(", " + field.getDatabaseField());
}else{
- sb.append(fields.getValueMeta(i).getName());
+ sb.append(field.getDatabaseField());
}
}
sb.append(")");
@@ -302,7 +303,8 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
sb.append(" NULL '' ");
sb.append(" EMPTYASNULL ");
if(meta.isStreamToS3Csv() ||
meta.getLoadFromExistingFileFormat().equals("CSV")){
- sb.append(" delimiter ','");
+ sb.append(" DELIMITER ',' ");
+ sb.append(" CSV QUOTE AS '\"'");
}
if(meta.isUseAwsIamRole()){
sb.append(" iam_role '" + meta.getAwsIamRole() + "'");
@@ -477,10 +479,10 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
if(streamIndex >= 0){
if(needConversion){
IValueMeta valueMeta = rowMeta.getValueMeta(streamIndex);
- Object obj = row[i];
+ Object obj = row[streamIndex];
valueData = v.convertData(valueMeta, obj);
}else{
- valueData = row[i];
+ valueData = row[streamIndex];
}
} else if (meta.isErrorColumnMismatch()) {
throw new HopException(
@@ -495,7 +497,7 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
} else {
int jsonField = data.fieldnrs.get("json");
data.writer.write(
- data.outputRowMeta.getString(row,
jsonField).getBytes(StandardCharsets.UTF_8));
+ data.insertRowMeta.getString(row,
jsonField).getBytes(StandardCharsets.UTF_8));
data.writer.write(data.binaryNewline);
}
} catch (Exception e) {
@@ -530,6 +532,8 @@ public class RedshiftBulkLoader extends
BaseTransform<RedshiftBulkLoaderMeta, Re
boolean writeEnclosures = false;
if (v.isString()) {
+ writeEnclosures = true;
+
if (containsSeparatorOrEnclosure(
str, data.binarySeparator, data.binaryEnclosure,
data.escapeCharacters)) {
writeEnclosures = true;
diff --git
a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java
b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java
index 5c8503c2b1..abec768573 100644
---
a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java
+++
b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java
@@ -63,7 +63,7 @@ public class RedshiftBulkLoaderMeta
public static final String CSV_DELIMITER = ",";
public static final String CSV_RECORD_DELIMITER = "\n";
- public static final String CSV_ESCAPE_CHAR = "\\";
+ public static final String CSV_ESCAPE_CHAR = "\"";
public static final String ENCLOSURE = "\"";
@HopMetadataProperty(