Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 461999eda -> 3323543b6


[GOBBLIN-459] Support encryption and decryption of strings + arrays of strings

Closes #2330 from eogren/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3323543b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3323543b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3323543b

Branch: refs/heads/master
Commit: 3323543b6cfb5e5cfbfbea85783a5976f3a76267
Parents: 461999e
Author: Eric Ogren <eog...@linkedin.com>
Authored: Sun Apr 8 23:04:42 2018 -0700
Committer: Hung Tran <hut...@linkedin.com>
Committed: Sun Apr 8 23:04:42 2018 -0700

----------------------------------------------------------------------
 .../gobblin/recordaccess/RecordAccessor.java    |   7 +
 .../recordaccess/AvroGenericRecordAccessor.java |  60 +++++++-
 .../AvroGenericRecordAccessorTest.java          |  49 ++++++-
 .../RecordAccessorProviderFactoryTest.java      |  16 +++
 .../resources/converter/fieldPickInput.avsc     |   3 +-
 .../converter/fieldPickInput_arrays.avro        | Bin 0 -> 552 bytes
 .../AvroStringFieldDecryptorConverter.java      |  34 +++++
 .../StringFieldDecryptorConverter.java          | 131 +++++++++++++++++
 .../StringFieldEncryptorConverter.java          |  45 ++++--
 .../AvroStringFieldDecryptorConverterTest.java  | 141 +++++++++++++++++++
 .../AvroStringFieldEncryptorConverterTest.java  |  62 +++++++-
 .../test/resources/fieldPickInput_arrays.avro   | Bin 0 -> 552 bytes
 12 files changed, 525 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java 
b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java
index 5791771..10f2fb3 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.recordaccess;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -72,14 +73,19 @@ public interface RecordAccessor {
    * IncorrectTypeException if the underlying types do not match. Getters 
should not
    * try to do any type coercion -- for example, getAsInt for a value that is 
the string "1"
    * should throw a Sch.
+   *
+   * The get*Generic functions should return the following object types:
+   *   String, Integer, Long, or a List of them.
    */
   Map<String, String> getMultiAsString(String fieldName);
   Map<String, Integer> getMultiAsInt(String fieldName);
   Map<String, Long> getMultiAsLong(String fieldName);
+  Map<String, Object> getMultiGeneric(String fieldName);
 
   String getAsString(String fieldName);
   Integer getAsInt(String fieldName);
   Long getAsLong(String fieldName);
+  Object getGeneric(String fieldName);
 
   /*
    * Set new values for an object. Should throw a FieldDoesNotExistException 
runtime exception if fieldName
@@ -88,6 +94,7 @@ public interface RecordAccessor {
   void set(String fieldName, String value);
   void set(String fieldName, Integer value);
   void set(String fieldName, Long value);
+  void setStringArray(String fieldName, List<String> value);
   void setToNull(String fieldName);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
index d51b5db..620933d 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.recordaccess;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -23,6 +24,7 @@ import java.util.Map;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
@@ -48,7 +50,7 @@ public class AvroGenericRecordAccessor implements 
RecordAccessor {
 
   @Override
   public Map<String, String> getMultiAsString(String fieldName) {
-    Map<String, Object> vals = getMultiAsObject(fieldName);
+    Map<String, Object> vals = getMultiGeneric(fieldName);
     Map<String, String> ret = new HashMap<>();
 
     for (Map.Entry<String, Object> entry : vals.entrySet()) {
@@ -69,9 +71,12 @@ public class AvroGenericRecordAccessor implements 
RecordAccessor {
     return convertToString(fieldName, obj);
   }
 
+
   private String convertToString(String fieldName, Object obj) {
     if (obj == null) {
       return null;
+    } else if (obj instanceof String) {
+      return (String)obj;
     } else if (obj instanceof Utf8) {
       return obj.toString();
     } else {
@@ -81,7 +86,7 @@ public class AvroGenericRecordAccessor implements 
RecordAccessor {
 
   @Override
   public Map<String, Integer> getMultiAsInt(String fieldName) {
-    Map<String, Object> vals = getMultiAsObject(fieldName);
+    Map<String, Object> vals = getMultiGeneric(fieldName);
     Map<String, Integer> ret = new HashMap<>();
 
     for (Map.Entry<String, Object> entry : vals.entrySet()) {
@@ -107,7 +112,7 @@ public class AvroGenericRecordAccessor implements 
RecordAccessor {
 
   @Override
   public Map<String, Long> getMultiAsLong(String fieldName) {
-    Map<String, Object> vals = getMultiAsObject(fieldName);
+    Map<String, Object> vals = getMultiGeneric(fieldName);
     Map<String, Long> ret = new HashMap<>();
 
     for (Map.Entry<String, Object> entry : vals.entrySet()) {
@@ -151,8 +156,36 @@ public class AvroGenericRecordAccessor implements 
RecordAccessor {
     return obj.isPresent() ? obj.get() : null;
   }
 
-  private Map<String, Object> getMultiAsObject(String fieldName) {
-    return AvroUtils.getMultiFieldValue(record, fieldName);
+  @Override
+  public Map<String, Object> getMultiGeneric(String fieldName) {
+    Map<String, Object> vals = AvroUtils.getMultiFieldValue(record, fieldName);
+    for (Map.Entry<String, Object> entry: vals.entrySet()) {
+      vals.put(entry.getKey(), convertAvroToJava(entry.getKey(), 
entry.getValue()));
+    }
+
+    return vals;
+  }
+
+  @Override
+  public Object getGeneric(String fieldName) {
+    Object val = getAsObject(fieldName);
+    return convertAvroToJava(fieldName, val);
+  }
+
+  private Object convertAvroToJava(String fieldName, Object val) {
+    if (val == null || val instanceof String || val instanceof Long || val 
instanceof Integer) {
+      return val;
+    }
+
+    if (val instanceof Utf8) {
+      return convertToString(fieldName, val);
+    }
+
+    if (val instanceof GenericArray) {
+      return convertToList(fieldName, (GenericArray) val);
+    }
+
+    throw new IllegalArgumentException("Don't know how to parse object of type 
" + val.getClass().getCanonicalName());
   }
 
   @Override
@@ -171,6 +204,13 @@ public class AvroGenericRecordAccessor implements 
RecordAccessor {
   }
 
   @Override
+  public void setStringArray(String fieldName, List<String> value) {
+    GenericData.Array<String> avroArray = new GenericData.Array<>(
+        Schema.createArray(Schema.create(Schema.Type.STRING)), value);
+    set(fieldName, avroArray);
+  }
+
+  @Override
   public void setToNull(String fieldName) {
     set(fieldName, (Object) null);
   }
@@ -225,4 +265,14 @@ public class AvroGenericRecordAccessor implements 
RecordAccessor {
       throw new FieldDoesNotExistException("Field not found setting name " + 
fieldName, e);
     }
   }
+
+  @SuppressWarnings("unchecked")
+  private List convertToList(String fieldName, GenericArray arr) {
+    List ret = new ArrayList();
+    for (int i = 0; i < arr.size(); i++) {
+      ret.add(convertAvroToJava(fieldName + "." + String.valueOf(i), 
arr.get(i)));
+    }
+
+    return ret;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
index ed43aee..17e46b0 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.recordaccess;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.avro.Schema;
@@ -31,12 +32,15 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
 import org.testng.Assert;
 import org.testng.ITestResult;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+
 
 public class AvroGenericRecordAccessorTest {
   private Schema recordSchema;
@@ -112,6 +116,39 @@ public class AvroGenericRecordAccessorTest {
     
Assert.assertEquals(accessor.getAsString("nestedRecords.1.fieldToEncrypt"), 
"val1");
   }
 
+  @Test
+  public void testSetStringArray() throws IOException {
+    List<String> quotes = ImmutableList.of("abracadabra", "hocuspocus");
+    accessor.setStringArray("favorite_quotes", quotes);
+
+    Assert.assertEquals(accessor.getGeneric("favorite_quotes"), quotes);
+  }
+
+  @Test
+  public void testGetStringArrayUtf8() throws IOException {
+    // Expectation: Even though we read an Avro object with UTF8 underneath, 
the accessor converts it into a
+    // Java String
+    List<String> expectedQuotes = ImmutableList.of("abc", "defg");
+
+    GenericData.Array<Utf8> strings = new GenericData.Array<Utf8>(2, 
Schema.createArray(Schema.create(Schema.Type.STRING)));
+    expectedQuotes.forEach(s -> strings.add(new Utf8(s)));
+    record.put("favorite_quotes", strings);
+
+    Assert.assertEquals(accessor.getGeneric("favorite_quotes"), 
expectedQuotes);
+  }
+
+  @Test
+  public void testGetMultiConvertsStrings() throws IOException {
+    updateRecordFromTestResource("converter/fieldPickInput", 
"converter/fieldPickInput_arrays.avro");
+    Map<String, Object> ret = accessor.getMultiGeneric("favorite_quotes");
+    Object val = ret.get("favorite_quotes");
+
+    Assert.assertTrue(val instanceof List);
+    List castedVal = (List)val;
+    Assert.assertEquals(2, castedVal.size());
+    Assert.assertEquals("hello world", castedVal.get(0));
+    Assert.assertEquals("foobar", castedVal.get(1));
+  }
 
   @Test
   public void testSetValueFromArray() throws IOException {
@@ -184,15 +221,23 @@ public class AvroGenericRecordAccessorTest {
     record.put("created", 0L);
   }
 
-  private void updateRecordFromTestResource(String resourceName)
+  private void updateRecordFromTestResource(String resourceName) throws 
IOException {
+    updateRecordFromTestResource(resourceName, null);
+  }
+
+  private void updateRecordFromTestResource(String resourceName, String 
avroFileName)
       throws IOException {
+    if (avroFileName == null) {
+      avroFileName = resourceName + ".avro";
+    }
+
     recordSchema = new Schema.Parser().parse(
         getClass().getClassLoader().getResourceAsStream(resourceName + ".avsc")
     );
 
     DatumReader<GenericRecord> reader = new GenericDatumReader<>(recordSchema);
     DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<GenericRecord>(
-        new File(getClass().getClassLoader().getResource(resourceName + 
".avro").getPath()), reader);
+        new 
File(getClass().getClassLoader().getResource(avroFileName).getPath()), reader);
 
     Assert.assertTrue(dataFileReader.hasNext());
     record = dataFileReader.next(record);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
index a6a8ce0..3ed37f4 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.recordaccess;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.avro.Schema;
@@ -121,6 +122,21 @@ public class RecordAccessorProviderFactoryTest {
     }
 
     @Override
+    public Map<String, Object> getMultiGeneric(String fieldName) {
+      return null;
+    }
+
+    @Override
+    public Object getGeneric(String fieldName) {
+      return null;
+    }
+
+    @Override
+    public void setStringArray(String fieldName, List<String> value) {
+
+    }
+
+    @Override
     public Map<String, String> getMultiAsString(String fieldName) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc 
b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
index 29b40a1..4da3fdf 100644
--- a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
+++ b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
@@ -7,6 +7,7 @@
      {"name": "favorite_color", "type": ["string", "null"]},
      {"name": "date_of_birth", "type": "long"},
      {"name": "last_modified", "type": "long"},
-     {"name": "created", "type": "long"}
+     {"name": "created", "type": "long"},
+     {"name": "favorite_quotes", "type": [{ "type": "array", "items": 
"string"}, "null"], "default": null}
  ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro 
b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro
new file mode 100644
index 0000000..c10a607
Binary files /dev/null and 
b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java
 
b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java
new file mode 100644
index 0000000..59408c3
--- /dev/null
+++ 
b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.gobblin.recordaccess.AvroGenericRecordAccessor;
+import org.apache.gobblin.recordaccess.RecordAccessor;
+
+
+/**
+ * StringFieldDecryptor that works on Avro GenericRecords.
+ */
+public class AvroStringFieldDecryptorConverter extends 
StringFieldDecryptorConverter<Schema, GenericRecord> {
+  @Override
+  protected RecordAccessor getRecordAccessor(GenericRecord record) {
+    return new AvroGenericRecordAccessor(record);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java
 
b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java
new file mode 100644
index 0000000..26f4bbd
--- /dev/null
+++ 
b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.codec.StreamCodec;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.crypto.EncryptionConfigParser;
+import org.apache.gobblin.crypto.EncryptionFactory;
+import org.apache.gobblin.recordaccess.RecordAccessor;
+
+import com.google.common.base.Splitter;
+
+
+/**
+ * Converter that can decrypt a string field in place. (Note: that means the 
incoming
+ * record will be mutated!). Assumes that the input field is of string
+ * type and that the decryption algorithm chosen will output a UTF-8 encoded 
byte array.
+ */
+public abstract class StringFieldDecryptorConverter<SCHEMA, DATA> extends 
Converter<SCHEMA, SCHEMA, DATA, DATA> {
+  public static final String FIELDS_TO_DECRYPT_CONFIG_NAME = 
"converter.fieldsToDecrypt";
+
+  private StreamCodec decryptor;
+  private List<String> fieldsToDecrypt;
+
+  @Override
+  public Converter<SCHEMA, SCHEMA, DATA, DATA> init(WorkUnitState workUnit) {
+    super.init(workUnit);
+    Map<String, Object> config = EncryptionConfigParser
+        
.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_DECRYPT, 
getClass().getSimpleName(), workUnit);
+    decryptor = EncryptionFactory.buildStreamCryptoProvider(config);
+
+    String fieldsToDecryptConfig = 
workUnit.getProp(FIELDS_TO_DECRYPT_CONFIG_NAME, null);
+    if (fieldsToDecryptConfig == null) {
+      throw new IllegalArgumentException("Must fill in the " + 
FIELDS_TO_DECRYPT_CONFIG_NAME + " config option!");
+    }
+
+    fieldsToDecrypt = Splitter.on(',').splitToList(fieldsToDecryptConfig);
+
+    return this;
+  }
+
+  @Override
+  public SCHEMA convertSchema(SCHEMA inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    return inputSchema;
+  }
+
+  @Override
+  public Iterable<DATA> convertRecord(SCHEMA outputSchema, DATA inputRecord, 
WorkUnitState workUnit)
+      throws DataConversionException {
+    RecordAccessor accessor = getRecordAccessor(inputRecord);
+
+    for (String field : fieldsToDecrypt) {
+      Map<String, Object> stringsToDecrypt = accessor.getMultiGeneric(field);
+      try {
+        for (Map.Entry<String, Object> entry : stringsToDecrypt.entrySet()) {
+          if (entry.getValue() instanceof String) {
+            String s = decryptString((String) entry.getValue());
+            accessor.set(entry.getKey(), s);
+          } else if (entry.getValue() instanceof List) {
+            List<String> decryptedValues = new ArrayList<>();
+            for (Object val : (List)entry.getValue()) {
+              if (!(val instanceof String)) {
+                throw new IllegalArgumentException("Expected List of Strings, 
but encountered a value of type "
+                  + val.getClass().getCanonicalName());
+              }
+
+              decryptedValues.add(decryptString((String)val));
+            }
+
+            accessor.setStringArray(entry.getKey(), decryptedValues);
+          } else {
+            throw new IllegalArgumentException(
+                "Expected field to be of type String or List<String>, was " + 
entry.getValue().getClass()
+                    .getCanonicalName());
+          }
+        }
+      } catch (IOException | IllegalArgumentException | IllegalStateException 
e) {
+        throw new DataConversionException("Error while encrypting field " + 
field + ": " + e.getMessage(), e);
+      }
+    }
+
+    return Collections.singleton(inputRecord);
+  }
+
+  protected List<String> getFieldsToDecrypt() {
+    return fieldsToDecrypt;
+  }
+
+  protected String decryptString(String val)
+      throws IOException {
+    byte[] encryptedBytes = val.getBytes(StandardCharsets.UTF_8);
+
+    ByteArrayInputStream inStream = new ByteArrayInputStream(encryptedBytes);
+
+    try (InputStream cipherStream = decryptor.decodeInputStream(inStream);
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+      IOUtils.copy(cipherStream, outputStream);
+
+      byte[] decryptedBytes = outputStream.toByteArray();
+      return new String(decryptedBytes, StandardCharsets.UTF_8);
+    }
+  }
+
+  protected abstract RecordAccessor getRecordAccessor(DATA record);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
 
b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
index f242ec5..e9b21ed 100644
--- 
a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
+++ 
b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -72,30 +73,48 @@ public abstract class StringFieldEncryptorConverter<SCHEMA, 
DATA> extends Conver
     RecordAccessor accessor = getRecordAccessor(inputRecord);
 
     for (String field : fieldsToEncrypt) {
-      Map<String, String> stringsToEncrypt = accessor.getMultiAsString(field);
-
-      for (Map.Entry<String, String> entry : stringsToEncrypt.entrySet()) {
-        byte[] bytes = entry.getValue().getBytes(StandardCharsets.UTF_8);
-
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      Map<String, Object> stringsToEncrypt = accessor.getMultiGeneric(field);
 
+      for (Map.Entry<String, Object> entry : stringsToEncrypt.entrySet()) {
         try {
-          OutputStream cipherStream = 
encryptor.encodeOutputStream(outputStream);
-          cipherStream.write(bytes);
-          cipherStream.flush();
-          cipherStream.close();
+          if (entry.getValue() instanceof String) {
+            accessor.set(entry.getKey(), encryptString((String) 
entry.getValue()));
+          } else if (entry.getValue() instanceof List) {
+            List<String> encryptedVals = new ArrayList<>();
+
+            for (Object val: (List)entry.getValue()) {
+              if (!(val instanceof String)) {
+                throw new IllegalArgumentException("Unexpected type " + 
val.getClass().getCanonicalName() +
+                    " while encrypting field " + field);
+              }
+
+              encryptedVals.add(encryptString((String)val));
+            }
+
+            accessor.setStringArray(entry.getKey(), encryptedVals);
+          }
         } catch (IOException | IllegalArgumentException | 
IllegalStateException e) {
           throw new DataConversionException("Error while encrypting field " + 
field + ": " + e.getMessage(), e);
         }
-
-        byte[] cipherBytes = outputStream.toByteArray();
-        accessor.set(entry.getKey(), new String(cipherBytes, 
StandardCharsets.UTF_8));
       }
     }
 
     return Collections.singleton(inputRecord);
   }
 
+  private String encryptString(String val)
+      throws IOException {
+    byte[] bytes = val.getBytes(StandardCharsets.UTF_8);
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+    OutputStream cipherStream = encryptor.encodeOutputStream(outputStream);
+    cipherStream.write(bytes);
+    cipherStream.flush();
+    cipherStream.close();
+    return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
+  }
+
   protected List<String> getFieldsToEncrypt() {
     return fieldsToEncrypt;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java
 
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java
new file mode 100644
index 0000000..36be27d
--- /dev/null
+++ 
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.gobblin.test.crypto.InsecureShiftCodec;
+
+
+public class AvroStringFieldDecryptorConverterTest {
+
+  @Test
+  public void testConversion()
+      throws DataConversionException, IOException, SchemaConversionException {
+    AvroStringFieldDecryptorConverter converter = new 
AvroStringFieldDecryptorConverter();
+    WorkUnitState wuState = new WorkUnitState();
+
+    wuState.getJobState().setProp("converter.fieldsToDecrypt", "field1");
+    
wuState.getJobState().setProp("converter.decrypt.AvroStringFieldDecryptorConverter.algorithm",
 "insecure_shift");
+
+    converter.init(wuState);
+    GenericRecord inputRecord = TestUtils.generateRandomAvroRecord();
+
+    Schema inputSchema = inputRecord.getSchema();
+    Schema outputSchema = converter.convertSchema(inputSchema, wuState);
+
+    String fieldValue = (String) inputRecord.get("field1");
+
+    Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, 
inputRecord, wuState);
+    GenericRecord decryptedRecord = recordIt.iterator().next();
+
+    Assert.assertEquals(outputSchema, inputSchema);
+    String decryptedValue = (String) decryptedRecord.get("field1");
+
+    InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, 
Object>newHashMap());
+    InputStream in = codec.decodeInputStream(new 
ByteArrayInputStream(fieldValue.getBytes(StandardCharsets.UTF_8)));
+    byte[] expectedDecryptedValue = new byte[in.available()];
+    in.read(expectedDecryptedValue);
+
+    Assert.assertEquals(new String(expectedDecryptedValue, 
StandardCharsets.UTF_8), decryptedValue);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testArrayDecryption()
+      throws DataConversionException, IOException, SchemaConversionException {
+    AvroStringFieldDecryptorConverter converter = new 
AvroStringFieldDecryptorConverter();
+    WorkUnitState wuState = new WorkUnitState();
+
+    wuState.getJobState().setProp("converter.fieldsToDecrypt", "array1");
+    
wuState.getJobState().setProp("converter.decrypt.AvroStringFieldDecryptorConverter.algorithm",
 "insecure_shift");
+
+    converter.init(wuState);
+    GenericRecord inputRecord = generateRecordWithArrays();
+
+    Schema inputSchema = inputRecord.getSchema();
+    Schema outputSchema = converter.convertSchema(inputSchema, wuState);
+
+    GenericData.Array<String> fieldValue = (GenericData.Array<String>) 
inputRecord.get("array1");
+
+    Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, 
inputRecord, wuState);
+    GenericRecord decryptedRecord = recordIt.iterator().next();
+
+    Assert.assertEquals(outputSchema, inputSchema);
+    GenericData.Array<String> decryptedValue = (GenericData.Array<String>) 
decryptedRecord.get("array1");
+
+    for (int i = 0; i < decryptedValue.size(); i++) {
+      assertDecryptedValuesEqual(decryptedValue.get(i), fieldValue.get(i));
+    }
+  }
+
+  private void assertDecryptedValuesEqual(String decryptedValue, String 
originalValue) throws IOException {
+    InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, 
Object>newHashMap());
+    InputStream in = codec.decodeInputStream(new 
ByteArrayInputStream(originalValue.getBytes(StandardCharsets.UTF_8)));
+    byte[] expectedDecryptedValue = new byte[in.available()];
+    in.read(expectedDecryptedValue);
+
+    Assert.assertEquals(new String(expectedDecryptedValue, 
StandardCharsets.UTF_8), decryptedValue);
+  }
+
+  private GenericRecord getRecordFromFile(String path) throws IOException {
+    DatumReader<GenericRecord> reader = new GenericDatumReader<>();
+    DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<GenericRecord>(new File(path), reader);
+    while (dataFileReader.hasNext()) {
+      return dataFileReader.next();
+    }
+
+    return null;
+  }
+
+  private GenericRecord generateRecordWithArrays() {
+    ArrayList<Schema.Field> fields = new ArrayList<Schema.Field>();
+    String fieldName = "array1";
+    Schema fieldSchema = Schema.createArray(Schema.create(Schema.Type.STRING));
+    String docString = "doc";
+    fields.add(new Schema.Field(fieldName, fieldSchema, docString, null));
+    Schema schema = Schema.createRecord("name", docString, "test", false);
+    schema.setFields(fields);
+
+    GenericData.Record record = new GenericData.Record(schema);
+    GenericData.Array<String> arr = new GenericData.Array<>(2, fieldSchema);
+    arr.add("foobar");
+    arr.add("foobaz");
+
+    record.put("array1", arr);
+
+    return record;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
 
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
index e16401d..5078ae8 100644
--- 
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
+++ 
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.converter;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,9 +27,14 @@ import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -37,6 +43,7 @@ import com.google.common.collect.Maps;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.test.TestUtils;
 import org.apache.gobblin.test.crypto.InsecureShiftCodec;
+import org.testng.collections.Lists;
 
 
 public class AvroStringFieldEncryptorConverterTest {
@@ -83,10 +90,61 @@ public class AvroStringFieldEncryptorConverterTest {
     Assert.assertEquals(decryptedValues, origValues);
   }
 
-  private GenericRecord getRecordFromFile(String path) throws IOException {
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testEncryptionOfArray()
+      throws SchemaConversionException, DataConversionException, IOException {
+    AvroStringFieldEncryptorConverter converter = new 
AvroStringFieldEncryptorConverter();
+    WorkUnitState wuState = new WorkUnitState();
+
+    wuState.getJobState().setProp("converter.fieldsToEncrypt", 
"favorite_quotes");
+    wuState.getJobState().setProp("converter.encrypt.algorithm", 
"insecure_shift");
+
+    converter.init(wuState);
+    GenericRecord inputRecord =
+        
getRecordFromFile(getClass().getClassLoader().getResource("fieldPickInput_arrays.avro").getPath());
+    GenericArray origValues = (GenericArray) 
inputRecord.get("favorite_quotes");
+    for (int i = 0; i < origValues.size(); i++) {
+      origValues.set(i, origValues.get(i).toString());
+    }
+
+    Schema inputSchema = inputRecord.getSchema();
+    Schema outputSchema = converter.convertSchema(inputSchema, wuState);
+
+    Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, 
inputRecord, wuState);
+    GenericRecord encryptedRecord = recordIt.iterator().next();
+
+    Assert.assertEquals(outputSchema, inputSchema);
+
+    GenericArray<String> encryptedVals = (GenericArray<String>) 
encryptedRecord.get("favorite_quotes");
+    List<String> decryptedValues = Lists.newArrayList();
+    for (String encryptedValue: encryptedVals) {
+      InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, 
Object>newHashMap());
+      InputStream in =
+          codec.decodeInputStream(new 
ByteArrayInputStream(encryptedValue.getBytes(StandardCharsets.UTF_8)));
+      byte[] decryptedValue = new byte[in.available()];
+      in.read(decryptedValue);
+      decryptedValues.add(new String(decryptedValue, StandardCharsets.UTF_8));
+    }
+
+    Assert.assertEquals(decryptedValues, origValues);
+  }
+
+  private GenericArray<String> buildTestArray() {
+    Schema s = Schema.createArray(Schema.create(Schema.Type.STRING));
+    GenericArray<String> arr = new GenericData.Array<>(3, s);
+    arr.add("one");
+    arr.add("two");
+    arr.add("three");
+
+    return arr;
+  }
+
+  private GenericRecord getRecordFromFile(String path)
+      throws IOException {
     DatumReader<GenericRecord> reader = new GenericDatumReader<>();
     DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<GenericRecord>(new File(path), reader);
-    while (dataFileReader.hasNext()) {
+    if (dataFileReader.hasNext()) {
       return dataFileReader.next();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
 
b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
new file mode 100644
index 0000000..c10a607
Binary files /dev/null and 
b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
 differ

Reply via email to