Repository: gora
Updated Branches:
refs/heads/master 560704c3a -> 4bbf52ee7
Switched Accumulo Dependency to 1.7.1 and ported AccumuloStore Class to
work with accumulo 1.7.1
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ce945da3
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ce945da3
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ce945da3
Branch: refs/heads/master
Commit: ce945da33c42463567c40a2d35fc34a0e23618d6
Parents: 560704c
Author: Vaibhav Thapliyal <[email protected]>
Authored: Fri Feb 17 11:39:15 2017 +0530
Committer: vaibhavthapliyal <[email protected]>
Committed: Fri Feb 17 11:39:15 2017 +0530
----------------------------------------------------------------------
gora-accumulo/pom.xml | 2 +-
.../gora/accumulo/store/AccumuloStore.java | 1886 +++++++++---------
2 files changed, 956 insertions(+), 932 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/ce945da3/gora-accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/gora-accumulo/pom.xml b/gora-accumulo/pom.xml
index bc131fa..e13f7a7 100644
--- a/gora-accumulo/pom.xml
+++ b/gora-accumulo/pom.xml
@@ -50,7 +50,7 @@
</ciManagement>
<properties>
- <accumulo.version>1.6.4</accumulo.version>
+ <accumulo.version>1.7.1</accumulo.version>
<osgi.import>*</osgi.import>
<osgi.export>org.apache.gora.accumulo*;version="${project.version}";-noimport:=true</osgi.export>
</properties>
http://git-wip-us.apache.org/repos/asf/gora/blob/ce945da3/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
----------------------------------------------------------------------
diff --git
a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
index a68cdaa..a4cddce 100644
---
a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
+++
b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
@@ -50,25 +50,27 @@ import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mock.MockConnector;
import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.mock.MockTabletLocator;
+import org.apache.accumulo.core.client.mock.impl.MockTabletLocator;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.iterators.SortedKeyIterator;
import org.apache.accumulo.core.iterators.user.TimestampFilter;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.client.impl.Credentials;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.avro.Schema;
@@ -108,933 +110,955 @@ import org.w3c.dom.NodeList;
/**
* Implementation of a Accumulo data store to be used by gora.
*
- * @param <K> class to be used for the key
- * @param <T> class to be persisted within the store
+ * @param <K>
+ * class to be used for the key
+ * @param <T>
+ * class to be persisted within the store
*/
-public class AccumuloStore<K,T extends PersistentBase> extends
DataStoreBase<K,T> {
-
- protected static final String MOCK_PROPERTY = "accumulo.mock";
- protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
- protected static final String ZOOKEEPERS_NAME_PROPERTY =
"accumulo.zookeepers";
- protected static final String USERNAME_PROPERTY = "accumulo.user";
- protected static final String PASSWORD_PROPERTY = "accumulo.password";
- protected static final String DEFAULT_MAPPING_FILE =
"gora-accumulo-mapping.xml";
-
- private final static String UNKOWN = "Unknown type ";
-
- private Connector conn;
- private BatchWriter batchWriter;
- private AccumuloMapping mapping;
- private Credentials credentials;
- private Encoder encoder;
-
- public static final Logger LOG =
LoggerFactory.getLogger(AccumuloStore.class);
-
- public Object fromBytes(Schema schema, byte[] data) throws IOException {
- Schema fromSchema = null;
- if (schema.getType() == Type.UNION) {
- try {
- Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
- int unionIndex = decoder.readIndex();
- List<Schema> possibleTypes = schema.getTypes();
- fromSchema = possibleTypes.get(unionIndex);
- Schema effectiveSchema = possibleTypes.get(unionIndex);
- if (effectiveSchema.getType() == Type.NULL) {
- decoder.readNull();
- return null;
- } else {
- data = decoder.readBytes(null).array();
- }
- } catch (IOException e) {
- LOG.error(e.getMessage());
- throw new GoraException("Error decoding union type: ", e);
- }
- } else {
- fromSchema = schema;
- }
- return fromBytes(encoder, fromSchema, data);
- }
-
- public static Object fromBytes(Encoder encoder, Schema schema, byte data[])
throws IOException {
- switch (schema.getType()) {
- case BOOLEAN:
- return encoder.decodeBoolean(data);
- case DOUBLE:
- return encoder.decodeDouble(data);
- case FLOAT:
- return encoder.decodeFloat(data);
- case INT:
- return encoder.decodeInt(data);
- case LONG:
- return encoder.decodeLong(data);
- case STRING:
- return new Utf8(data);
- case BYTES:
- return ByteBuffer.wrap(data);
- case ENUM:
- return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
- case ARRAY:
- break;
- case FIXED:
- break;
- case MAP:
- break;
- case NULL:
- break;
- case RECORD:
- break;
- case UNION:
- break;
- default:
- break;
- }
- throw new IllegalArgumentException(UNKOWN + schema.getType());
-
- }
-
- private static byte[] getBytes(Text text) {
- byte[] bytes = text.getBytes();
- if (bytes.length != text.getLength()) {
- bytes = new byte[text.getLength()];
- System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length);
- }
- return bytes;
- }
-
- public K fromBytes(Class<K> clazz, byte[] val) {
- return fromBytes(encoder, clazz, val);
- }
-
- @SuppressWarnings("unchecked")
- public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) {
- try {
- if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
- return (K) Byte.valueOf(encoder.decodeByte(val));
- } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
- return (K) Boolean.valueOf(encoder.decodeBoolean(val));
- } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
- return (K) Short.valueOf(encoder.decodeShort(val));
- } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
- return (K) Integer.valueOf(encoder.decodeInt(val));
- } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
- return (K) Long.valueOf(encoder.decodeLong(val));
- } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
- return (K) Float.valueOf(encoder.decodeFloat(val));
- } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
- return (K) Double.valueOf(encoder.decodeDouble(val));
- } else if (clazz.equals(String.class)) {
- return (K) new String(val, "UTF-8");
- } else if (clazz.equals(Utf8.class)) {
- return (K) new Utf8(val);
- }
-
- throw new IllegalArgumentException(UNKOWN + clazz.getName());
- } catch (IOException ioe) {
- LOG.error(ioe.getMessage());
- throw new RuntimeException(ioe);
- }
- }
-
- private static byte[] copyIfNeeded(byte b[], int offset, int len) {
- if (len != b.length || offset != 0) {
- byte[] copy = new byte[len];
- System.arraycopy(b, offset, copy, 0, copy.length);
- b = copy;
- }
- return b;
- }
-
- public byte[] toBytes(Schema toSchema, Object o) {
- if (toSchema != null && toSchema.getType() == Type.UNION) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- org.apache.avro.io.BinaryEncoder avroEncoder =
EncoderFactory.get().binaryEncoder(baos, null);
- int unionIndex = 0;
- try {
- if (o == null) {
- unionIndex = firstNullSchemaTypeIndex(toSchema);
- avroEncoder.writeIndex(unionIndex);
- avroEncoder.writeNull();
- } else {
- unionIndex = firstNotNullSchemaTypeIndex(toSchema);
- avroEncoder.writeIndex(unionIndex);
- avroEncoder.writeBytes(toBytes(o));
- }
- avroEncoder.flush();
- return baos.toByteArray();
- } catch (IOException e) {
- LOG.error(e.getMessage());
- return toBytes(o);
- }
- } else {
- return toBytes(o);
- }
- }
-
- private int firstNullSchemaTypeIndex(Schema toSchema) {
- List<Schema> possibleTypes = toSchema.getTypes();
- int unionIndex = 0;
- for (int i = 0; i < possibleTypes.size(); i++ ) {
- Type pType = possibleTypes.get(i).getType();
- if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests
- unionIndex = i; break;
- }
- }
- return unionIndex;
- }
-
- private int firstNotNullSchemaTypeIndex(Schema toSchema) {
- List<Schema> possibleTypes = toSchema.getTypes();
- int unionIndex = 0;
- for (int i = 0; i < possibleTypes.size(); i++ ) {
- Type pType = possibleTypes.get(i).getType();
- if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests
- unionIndex = i; break;
- }
- }
- return unionIndex;
- }
-
- public byte[] toBytes(Object o) {
- return toBytes(encoder, o);
- }
-
- public static byte[] toBytes(Encoder encoder, Object o) {
-
- try {
- if (o instanceof String) {
- return ((String) o).getBytes("UTF-8");
- } else if (o instanceof Utf8) {
- return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8)
o).getByteLength());
- } else if (o instanceof ByteBuffer) {
- return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer)
o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
- } else if (o instanceof Long) {
- return encoder.encodeLong((Long) o);
- } else if (o instanceof Integer) {
- return encoder.encodeInt((Integer) o);
- } else if (o instanceof Short) {
- return encoder.encodeShort((Short) o);
- } else if (o instanceof Byte) {
- return encoder.encodeByte((Byte) o);
- } else if (o instanceof Boolean) {
- return encoder.encodeBoolean((Boolean) o);
- } else if (o instanceof Float) {
- return encoder.encodeFloat((Float) o);
- } else if (o instanceof Double) {
- return encoder.encodeDouble((Double) o);
- } else if (o instanceof Enum) {
- return encoder.encodeInt(((Enum<?>) o).ordinal());
- }
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
-
- throw new IllegalArgumentException(UNKOWN + o.getClass().getName());
- }
-
- private BatchWriter getBatchWriter() throws IOException {
- if (batchWriter == null)
- try {
- BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
- batchWriterConfig.setMaxMemory(10000000);
- batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS);
- batchWriterConfig.setMaxWriteThreads(4);
- batchWriter = conn.createBatchWriter(mapping.tableName,
batchWriterConfig);
- } catch (TableNotFoundException e) {
- throw new IOException(e);
- }
- return batchWriter;
- }
-
- /**
- * Initialize the data store by reading the credentials, setting the
client's properties up and
- * reading the mapping file. Initialize is called when then the call to
- * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made.
- *
- * @param keyClass
- * @param persistentClass
- * @param properties
- */
- @Override
- public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) {
- try{
- super.initialize(keyClass, persistentClass, properties);
-
- String mock = DataStoreFactory.findProperty(properties, this,
MOCK_PROPERTY, null);
- String mappingFile = DataStoreFactory.getMappingFile(properties, this,
DEFAULT_MAPPING_FILE);
- String user = DataStoreFactory.findProperty(properties, this,
USERNAME_PROPERTY, null);
- String password = DataStoreFactory.findProperty(properties, this,
PASSWORD_PROPERTY, null);
-
- mapping = readMapping(mappingFile);
-
- if (mapping.encoder == null || "".equals(mapping.encoder)) {
- encoder = new BinaryEncoder();
- } else {
- try {
- encoder = (Encoder)
getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
- } catch (InstantiationException | IllegalAccessException |
ClassNotFoundException e) {
- throw new IOException(e);
- }
- }
-
- try {
- AuthenticationToken token = new PasswordToken(password);
- if (mock == null || !mock.equals("true")) {
- String instance = DataStoreFactory.findProperty(properties, this,
INSTANCE_NAME_PROPERTY, null);
- String zookeepers = DataStoreFactory.findProperty(properties, this,
ZOOKEEPERS_NAME_PROPERTY, null);
- conn = new ZooKeeperInstance(instance,
zookeepers).getConnector(user, token);
- } else {
- conn = new MockInstance().getConnector(user, token);
- }
- credentials = new Credentials(user, token);
-
- if (autoCreateSchema && !schemaExists())
- createSchema();
- } catch (AccumuloException | AccumuloSecurityException e) {
- throw new IOException(e);
- }
- } catch(IOException e){
- LOG.error(e.getMessage(), e);
- }
- }
-
- protected AccumuloMapping readMapping(String filename) throws IOException {
- try {
-
- AccumuloMapping mapping = new AccumuloMapping();
-
- DocumentBuilder db =
DocumentBuilderFactory.newInstance().newDocumentBuilder();
- Document dom =
db.parse(getClass().getClassLoader().getResourceAsStream(filename));
-
- Element root = dom.getDocumentElement();
-
- NodeList nl = root.getElementsByTagName("class");
- for (int i = 0; i < nl.getLength(); i++) {
-
- Element classElement = (Element) nl.item(i);
- if
(classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
- &&
classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
-
- mapping.tableName =
getSchemaName(classElement.getAttribute("table"), persistentClass);
- mapping.encoder = classElement.getAttribute("encoder");
-
- NodeList fields = classElement.getElementsByTagName("field");
- for (int j = 0; j < fields.getLength(); j++) {
- Element fieldElement = (Element) fields.item(j);
-
- String name = fieldElement.getAttribute("name");
- String family = fieldElement.getAttribute("family");
- String qualifier = fieldElement.getAttribute("qualifier");
- if ("".equals(qualifier))
- qualifier = null;
-
- Pair<Text,Text> col = new Pair<>(new Text(family), qualifier ==
null ? null : new Text(qualifier));
- mapping.fieldMap.put(name, col);
- mapping.columnMap.put(col, name);
- }
- }
-
- }
-
- if (mapping.tableName == null) {
- throw new GoraException("Please define the accumulo 'table' name
mapping in " + filename + " for " + persistentClass.getCanonicalName());
- }
-
- nl = root.getElementsByTagName("table");
- for (int i = 0; i < nl.getLength(); i++) {
- Element tableElement = (Element) nl.item(i);
- if (tableElement.getAttribute("name").equals(mapping.tableName)) {
- NodeList configs = tableElement.getElementsByTagName("config");
- for (int j = 0; j < configs.getLength(); j++) {
- Element configElement = (Element) configs.item(j);
- String key = configElement.getAttribute("key");
- String val = configElement.getAttribute("value");
- mapping.tableConfig.put(key, val);
- }
- }
- }
-
- return mapping;
- } catch (Exception ex) {
- throw new IOException("Unable to read " + filename, ex);
- }
-
- }
-
- @Override
- public String getSchemaName() {
- return mapping.tableName;
- }
-
- @Override
- public void createSchema() {
- try {
- conn.tableOperations().create(mapping.tableName);
- Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
- for (Entry<String,String> entry : es) {
- conn.tableOperations().setProperty(mapping.tableName, entry.getKey(),
entry.getValue());
- }
-
- } catch (AccumuloException | AccumuloSecurityException e) {
- LOG.error(e.getMessage(), e);
- } catch (TableExistsException e) {
- LOG.debug(e.getMessage(), e);
- }
- }
-
- @Override
- public void deleteSchema() {
- try {
- if (batchWriter != null)
- batchWriter.close();
- batchWriter = null;
- conn.tableOperations().delete(mapping.tableName);
- } catch (AccumuloException | AccumuloSecurityException |
TableNotFoundException e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- @Override
- public boolean schemaExists() {
- return conn.tableOperations().exists(mapping.tableName);
- }
-
- public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent)
throws IOException {
- ByteSequence row = null;
-
- Map<Utf8, Object> currentMap = null;
- List currentArray = null;
- Text currentFam = null;
- int currentPos = 0;
- Schema currentSchema = null;
- Field currentField = null;
-
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0],
null);
-
- while (iter.hasNext()) {
- Entry<Key,Value> entry = iter.next();
-
- if (row == null) {
- row = entry.getKey().getRowData();
- }
- byte[] val = entry.getValue().get();
-
- Field field = fieldMap.get(getFieldName(entry));
-
- if (currentMap != null) {
- if (currentFam.equals(entry.getKey().getColumnFamily())) {
- currentMap.put(new
Utf8(entry.getKey().getColumnQualifierData().toArray()),
- fromBytes(currentSchema, entry.getValue().get()));
- continue;
- } else {
- persistent.put(currentPos, currentMap);
- currentMap = null;
- }
- } else if (currentArray != null) {
- if (currentFam.equals(entry.getKey().getColumnFamily())) {
- currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
- continue;
- } else {
- persistent.put(currentPos, new
GenericData.Array<T>(currentField.schema(), currentArray));
- currentArray = null;
- }
- }
-
- switch (field.schema().getType()) {
- case MAP: // first entry only. Next are handled above on the next loop
- currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
- currentPos = field.pos();
- currentFam = entry.getKey().getColumnFamily();
- currentSchema = field.schema().getValueType();
-
- currentMap.put(new
Utf8(entry.getKey().getColumnQualifierData().toArray()),
- fromBytes(currentSchema, entry.getValue().get()));
- break;
- case ARRAY:
- currentArray = new DirtyListWrapper<>(new ArrayList<>());
- currentPos = field.pos();
- currentFam = entry.getKey().getColumnFamily();
- currentSchema = field.schema().getElementType();
- currentField = field;
-
- currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
-
- break;
- case UNION:// default value of null acts like union with null
- Schema effectiveSchema = field.schema().getTypes()
- .get(firstNotNullSchemaTypeIndex(field.schema()));
- // map and array were coded without union index so need to be read the
same way
- if (effectiveSchema.getType() == Type.ARRAY) {
- currentArray = new DirtyListWrapper<>(new ArrayList<>());
- currentPos = field.pos();
- currentFam = entry.getKey().getColumnFamily();
- currentSchema = field.schema().getElementType();
- currentField = field;
-
- currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
- break;
- }
- else if (effectiveSchema.getType() == Type.MAP) {
- currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>());
- currentPos = field.pos();
- currentFam = entry.getKey().getColumnFamily();
- currentSchema = effectiveSchema.getValueType();
-
- currentMap.put(new
Utf8(entry.getKey().getColumnQualifierData().toArray()),
- fromBytes(currentSchema, entry.getValue().get()));
- break;
- }
- // continue like a regular top-level union
- case RECORD:
- SpecificDatumReader<?> reader = new
SpecificDatumReader<Schema>(field.schema());
- persistent.put(field.pos(), reader.read(null,
DecoderFactory.get().binaryDecoder(val, decoder)));
- break;
- default:
- persistent.put(field.pos(), fromBytes(field.schema(),
entry.getValue().get()));
- }
- }
-
- if (currentMap != null) {
- persistent.put(currentPos, currentMap);
- } else if (currentArray != null) {
- persistent.put(currentPos, new
GenericData.Array<T>(currentField.schema(), currentArray));
- }
-
- persistent.clearDirty();
-
- return row;
- }
-
- /**
- * Retrieve field name from entry.
- * @param entry The Key-Value entry
- * @return String The field name
- */
- private String getFieldName(Entry<Key, Value> entry) {
- String fieldName = mapping.columnMap.get(new
Pair<>(entry.getKey().getColumnFamily(),
- entry.getKey().getColumnQualifier()));
- if (fieldName == null) {
- fieldName = mapping.columnMap.get(new
Pair<Text,Text>(entry.getKey().getColumnFamily(), null));
- }
- return fieldName;
- }
-
- private void setFetchColumns(Scanner scanner, String[] fields) {
- fields = getFieldsToQuery(fields);
- for (String field : fields) {
- Pair<Text,Text> col = mapping.fieldMap.get(field);
- if (col != null) {
- if (col.getSecond() == null) {
- scanner.fetchColumnFamily(col.getFirst());
- } else {
- scanner.fetchColumn(col.getFirst(), col.getSecond());
- }
- } else {
- LOG.error("Mapping not found for field: {}", field);
- }
- }
- }
-
- @Override
- public T get(K key, String[] fields) {
- try {
- // TODO make isolated scanner optional?
- Scanner scanner = new
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
- Range rowRange = new Range(new Text(toBytes(key)));
-
- scanner.setRange(rowRange);
- setFetchColumns(scanner, fields);
-
- T persistent = newPersistent();
- ByteSequence row = populate(scanner.iterator(), persistent);
- if (row == null)
- return null;
- return persistent;
- } catch (TableNotFoundException e) {
- LOG.error(e.getMessage(), e);
- return null;
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- return null;
- }
- }
-
- @Override
- public void put(K key, T val) {
-
- try{
- Mutation m = new Mutation(new Text(toBytes(key)));
-
- Schema schema = val.getSchema();
- List<Field> fields = schema.getFields();
- int count = 0;
-
- for (int i = 0; i < fields.size(); i++) {
- if (!val.isDirty(i)) {
- continue;
- }
- Field field = fields.get(i);
-
- Object o = val.get(field.pos());
-
- Pair<Text,Text> col = mapping.fieldMap.get(field.name());
-
- if (col == null) {
- throw new GoraException("Please define the gora to accumulo mapping
for field " + field.name());
- }
-
- switch (field.schema().getType()) {
- case MAP:
- count = putMap(m, count, field.schema().getValueType(), o, col,
field.name());
- break;
- case ARRAY:
- count = putArray(m, count, o, col, field.name());
- break;
- case UNION: // default value of null acts like union with null
- Schema effectiveSchema = field.schema().getTypes()
- .get(firstNotNullSchemaTypeIndex(field.schema()));
- // map and array need to compute qualifier
- if (effectiveSchema.getType() == Type.ARRAY) {
- count = putArray(m, count, o, col, field.name());
- break;
- }
- else if (effectiveSchema.getType() == Type.MAP) {
- count = putMap(m, count, effectiveSchema.getValueType(), o, col,
field.name());
- break;
- }
- // continue like a regular top-level union
- case RECORD:
- final SpecificDatumWriter<Object> writer = new
SpecificDatumWriter<>(field.schema());
- final byte[] byteData = IOUtils.serialize(writer,o);
- m.put(col.getFirst(), col.getSecond(), new Value(byteData));
- count++;
- break;
- default:
- m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
- count++;
- }
-
- }
-
- if (count > 0)
- try {
- getBatchWriter().addMutation(m);
- } catch (MutationsRejectedException e) {
- LOG.error(e.getMessage(), e);
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- private int putMap(Mutation m, int count, Schema valueType, Object o,
Pair<Text, Text> col, String fieldName) throws GoraException {
-
- // First of all we delete map field on accumulo store
- Text rowKey = new Text(m.getRow());
- Query<K, T> query = newQuery();
- query.setFields(fieldName);
- query.setStartKey((K)rowKey.toString());
- query.setEndKey((K)rowKey.toString());
- deleteByQuery(query);
- flush();
- if (o == null){
- return 0;
- }
-
- Set<?> es = ((Map<?, ?>)o).entrySet();
- for (Object entry : es) {
- Object mapKey = ((Entry<?, ?>) entry).getKey();
- Object mapVal = ((Entry<?, ?>) entry).getValue();
- if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?,
?>)o).isDirty())
- || !(o instanceof DirtyMapWrapper)) {
- m.put(col.getFirst(), new Text(toBytes(mapKey)), new
Value(toBytes(valueType, mapVal)));
- count++;
- }
- // TODO map value deletion
- }
- return count;
- }
-
- private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col,
String fieldName) {
-
- // First of all we delete array field on accumulo store
- Text rowKey = new Text(m.getRow());
- Query<K, T> query = newQuery();
- query.setFields(fieldName);
- query.setStartKey((K)rowKey.toString());
- query.setEndKey((K)rowKey.toString());
- deleteByQuery(query);
- flush();
- if (o == null){
- return 0;
- }
-
- List<?> array = (List<?>) o; // both GenericArray and DirtyListWrapper
- int j = 0;
- for (Object item : array) {
- m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
- count++;
- }
- return count;
- }
-
- @Override
- public boolean delete(K key) {
- Query<K,T> q = newQuery();
- q.setKey(key);
- return deleteByQuery(q) > 0;
- }
-
- @Override
- public long deleteByQuery(Query<K,T> query) {
- try {
- Scanner scanner = createScanner(query);
- // add iterator that drops values on the server side
- scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE,
SortedKeyIterator.class));
- RowIterator iterator = new RowIterator(scanner.iterator());
-
- long count = 0;
-
- while (iterator.hasNext()) {
- Iterator<Entry<Key,Value>> row = iterator.next();
- Mutation m = null;
- while (row.hasNext()) {
- Entry<Key,Value> entry = row.next();
- Key key = entry.getKey();
- if (m == null)
- m = new Mutation(key.getRow());
- // TODO optimize to avoid continually creating column vis? prob does
not matter for empty
- m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new
ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
- }
- getBatchWriter().addMutation(m);
- count++;
- }
-
- return count;
- } catch (TableNotFoundException e) {
- // TODO return 0?
- LOG.error(e.getMessage(), e);
- return 0;
- } catch (MutationsRejectedException e) {
- LOG.error(e.getMessage(), e);
- return 0;
- } catch (IOException e){
- LOG.error(e.getMessage(), e);
- return 0;
- }
- }
-
- private Range createRange(Query<K,T> query) {
- Text startRow = null;
- Text endRow = null;
-
- if (query.getStartKey() != null)
- startRow = new Text(toBytes(query.getStartKey()));
-
- if (query.getEndKey() != null)
- endRow = new Text(toBytes(query.getEndKey()));
-
- return new Range(startRow, true, endRow, true);
-
- }
-
- private Scanner createScanner(Query<K,T> query) throws
TableNotFoundException {
- // TODO make isolated scanner optional?
- Scanner scanner = new
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
- setFetchColumns(scanner, query.getFields());
-
- scanner.setRange(createRange(query));
-
- if (query.getStartTime() != -1 || query.getEndTime() != -1) {
- IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
- if (query.getStartTime() != -1)
- TimestampFilter.setStart(is, query.getStartTime(), true);
- if (query.getEndTime() != -1)
- TimestampFilter.setEnd(is, query.getEndTime(), true);
-
- scanner.addScanIterator(is);
- }
-
- return scanner;
- }
-
- /**
- * Execute the query and return the result.
- */
- @Override
- public Result<K,T> execute(Query<K,T> query) {
- try {
- Scanner scanner = createScanner(query);
- return new AccumuloResult<>(this, query, scanner);
- } catch (TableNotFoundException e) {
- // TODO return empty result?
- LOG.error(e.getMessage(), e);
- return null;
- }
- }
-
- @Override
- public Query<K,T> newQuery() {
- return new AccumuloQuery<>(this);
- }
-
- Text pad(Text key, int bytes) {
- if (key.getLength() < bytes)
- key = new Text(key);
-
- while (key.getLength() < bytes) {
- key.append(new byte[] {0}, 0, 1);
- }
-
- return key;
- }
-
- @Override
- public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws
IOException {
- try {
- TabletLocator tl;
- if (conn instanceof MockConnector)
- tl = new MockTabletLocator();
- else
- tl = TabletLocator.getLocator(conn.getInstance(), new
Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
-
- Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
-
- tl.invalidateCache();
- while (tl.binRanges(credentials,
Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
- // TODO log?
- if (!Tables.exists(conn.getInstance(),
Tables.getTableId(conn.getInstance(), mapping.tableName)))
- throw new
TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
- else if (Tables.getTableState(conn.getInstance(),
Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
- throw new TableOfflineException(conn.getInstance(),
Tables.getTableId(conn.getInstance(), mapping.tableName));
- UtilWaitThread.sleep(100);
- tl.invalidateCache();
- }
-
- List<PartitionQuery<K,T>> ret = new ArrayList<>();
-
- Text startRow = null;
- Text endRow = null;
- if (query.getStartKey() != null)
- startRow = new Text(toBytes(query.getStartKey()));
- if (query.getEndKey() != null)
- endRow = new Text(toBytes(query.getEndKey()));
-
- //hadoop expects hostnames, accumulo keeps track of IPs... so need to
convert
- HashMap<String,String> hostNameCache = new HashMap<>();
-
- for (Entry<String,Map<KeyExtent,List<Range>>> entry :
binnedRanges.entrySet()) {
- String ip = entry.getKey().split(":", 2)[0];
- String location = hostNameCache.get(ip);
- if (location == null) {
- InetAddress inetAddress = InetAddress.getByName(ip);
- location = inetAddress.getHostName();
- hostNameCache.put(ip, location);
- }
-
- Map<KeyExtent,List<Range>> tablets = entry.getValue();
- for (KeyExtent ke : tablets.keySet()) {
-
- K startKey = null;
- if (startRow == null || !ke.contains(startRow)) {
- if (ke.getPrevEndRow() != null) {
- startKey = followingKey(encoder, getKeyClass(),
getBytes(ke.getPrevEndRow()));
- }
- } else {
- startKey = fromBytes(getKeyClass(), getBytes(startRow));
- }
-
- K endKey = null;
- if (endRow == null || !ke.contains(endRow)) {
- if (ke.getEndRow() != null)
- endKey = lastPossibleKey(encoder, getKeyClass(),
getBytes(ke.getEndRow()));
- } else {
- endKey = fromBytes(getKeyClass(), getBytes(endRow));
- }
-
- PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query,
startKey, endKey, location);
- pqi.setConf(getConf());
- ret.add(pqi);
- }
- }
-
- return ret;
- } catch (TableNotFoundException | AccumuloException |
AccumuloSecurityException e) {
- throw new IOException(e);
- }
-
- }
-
- static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
-
- if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
- throw new UnsupportedOperationException();
- } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
- throw new UnsupportedOperationException();
- } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
- return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er));
- } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
- return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
- } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
- return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
- } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
- return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
- } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
- return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
- } else if (clazz.equals(String.class)) {
- throw new UnsupportedOperationException();
- } else if (clazz.equals(Utf8.class)) {
- return fromBytes(encoder, clazz, er);
- }
-
- throw new IllegalArgumentException(UNKOWN + clazz.getName());
- }
-
- @SuppressWarnings("unchecked")
- static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
-
- if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
- return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
- } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
- throw new UnsupportedOperationException();
- } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
- return fromBytes(encoder, clazz, encoder.followingKey(2, per));
- } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
- return fromBytes(encoder, clazz, encoder.followingKey(4, per));
- } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
- return fromBytes(encoder, clazz, encoder.followingKey(8, per));
- } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
- return fromBytes(encoder, clazz, encoder.followingKey(4, per));
- } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
- return fromBytes(encoder, clazz, encoder.followingKey(8, per));
- } else if (clazz.equals(String.class)) {
- throw new UnsupportedOperationException();
- } else if (clazz.equals(Utf8.class)) {
- return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1));
- }
-
- throw new IllegalArgumentException(UNKOWN + clazz.getName());
- }
-
- @Override
- public void flush() {
- try {
- if (batchWriter != null) {
- batchWriter.flush();
- }
- } catch (MutationsRejectedException e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- @Override
- public void close() {
- try {
- if (batchWriter != null) {
- batchWriter.close();
- batchWriter = null;
- }
- } catch (MutationsRejectedException e) {
- LOG.error(e.getMessage(), e);
- }
- }
+public class AccumuloStore<K, T extends PersistentBase> extends
DataStoreBase<K, T> {
+
+ protected static final String MOCK_PROPERTY = "accumulo.mock";
+ protected static final String INSTANCE_NAME_PROPERTY =
"accumulo.instance";
+ protected static final String ZOOKEEPERS_NAME_PROPERTY =
"accumulo.zookeepers";
+ protected static final String USERNAME_PROPERTY = "accumulo.user";
+ protected static final String PASSWORD_PROPERTY = "accumulo.password";
+ protected static final String DEFAULT_MAPPING_FILE =
"gora-accumulo-mapping.xml";
+
+ private final static String UNKOWN = "Unknown type ";
+
+ private Connector conn;
+ private BatchWriter batchWriter;
+ private AccumuloMapping mapping;
+ private Credentials credentials;
+ private Encoder encoder;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(AccumuloStore.class);
+
+ public Object fromBytes(Schema schema, byte[] data) throws IOException {
+ Schema fromSchema = null;
+ if (schema.getType() == Type.UNION) {
+ try {
+ Decoder decoder =
DecoderFactory.get().binaryDecoder(data, null);
+ int unionIndex = decoder.readIndex();
+ List<Schema> possibleTypes = schema.getTypes();
+ fromSchema = possibleTypes.get(unionIndex);
+ Schema effectiveSchema =
possibleTypes.get(unionIndex);
+ if (effectiveSchema.getType() == Type.NULL) {
+ decoder.readNull();
+ return null;
+ } else {
+ data = decoder.readBytes(null).array();
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ throw new GoraException("Error decoding union
type: ", e);
+ }
+ } else {
+ fromSchema = schema;
+ }
+ return fromBytes(encoder, fromSchema, data);
+ }
+
+ public static Object fromBytes(Encoder encoder, Schema schema, byte
data[]) throws IOException {
+ switch (schema.getType()) {
+ case BOOLEAN:
+ return encoder.decodeBoolean(data);
+ case DOUBLE:
+ return encoder.decodeDouble(data);
+ case FLOAT:
+ return encoder.decodeFloat(data);
+ case INT:
+ return encoder.decodeInt(data);
+ case LONG:
+ return encoder.decodeLong(data);
+ case STRING:
+ return new Utf8(data);
+ case BYTES:
+ return ByteBuffer.wrap(data);
+ case ENUM:
+ return AvroUtils.getEnumValue(schema,
encoder.decodeInt(data));
+ case ARRAY:
+ break;
+ case FIXED:
+ break;
+ case MAP:
+ break;
+ case NULL:
+ break;
+ case RECORD:
+ break;
+ case UNION:
+ break;
+ default:
+ break;
+ }
+ throw new IllegalArgumentException(UNKOWN + schema.getType());
+
+ }
+
+ private static byte[] getBytes(Text text) {
+ byte[] bytes = text.getBytes();
+ if (bytes.length != text.getLength()) {
+ bytes = new byte[text.getLength()];
+ System.arraycopy(text.getBytes(), 0, bytes, 0,
bytes.length);
+ }
+ return bytes;
+ }
+
+ public K fromBytes(Class<K> clazz, byte[] val) {
+ return fromBytes(encoder, clazz, val);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[]
val) {
+ try {
+ if (clazz.equals(Byte.TYPE) ||
clazz.equals(Byte.class)) {
+ return (K)
Byte.valueOf(encoder.decodeByte(val));
+ } else if (clazz.equals(Boolean.TYPE) ||
clazz.equals(Boolean.class)) {
+ return (K)
Boolean.valueOf(encoder.decodeBoolean(val));
+ } else if (clazz.equals(Short.TYPE) ||
clazz.equals(Short.class)) {
+ return (K)
Short.valueOf(encoder.decodeShort(val));
+ } else if (clazz.equals(Integer.TYPE) ||
clazz.equals(Integer.class)) {
+ return (K)
Integer.valueOf(encoder.decodeInt(val));
+ } else if (clazz.equals(Long.TYPE) ||
clazz.equals(Long.class)) {
+ return (K)
Long.valueOf(encoder.decodeLong(val));
+ } else if (clazz.equals(Float.TYPE) ||
clazz.equals(Float.class)) {
+ return (K)
Float.valueOf(encoder.decodeFloat(val));
+ } else if (clazz.equals(Double.TYPE) ||
clazz.equals(Double.class)) {
+ return (K)
Double.valueOf(encoder.decodeDouble(val));
+ } else if (clazz.equals(String.class)) {
+ return (K) new String(val, "UTF-8");
+ } else if (clazz.equals(Utf8.class)) {
+ return (K) new Utf8(val);
+ }
+
+ throw new IllegalArgumentException(UNKOWN +
clazz.getName());
+ } catch (IOException ioe) {
+ LOG.error(ioe.getMessage());
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ private static byte[] copyIfNeeded(byte b[], int offset, int len) {
+ if (len != b.length || offset != 0) {
+ byte[] copy = new byte[len];
+ System.arraycopy(b, offset, copy, 0, copy.length);
+ b = copy;
+ }
+ return b;
+ }
+
+ public byte[] toBytes(Schema toSchema, Object o) {
+ if (toSchema != null && toSchema.getType() == Type.UNION) {
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ org.apache.avro.io.BinaryEncoder avroEncoder =
EncoderFactory.get().binaryEncoder(baos, null);
+ int unionIndex = 0;
+ try {
+ if (o == null) {
+ unionIndex =
firstNullSchemaTypeIndex(toSchema);
+ avroEncoder.writeIndex(unionIndex);
+ avroEncoder.writeNull();
+ } else {
+ unionIndex =
firstNotNullSchemaTypeIndex(toSchema);
+ avroEncoder.writeIndex(unionIndex);
+ avroEncoder.writeBytes(toBytes(o));
+ }
+ avroEncoder.flush();
+ return baos.toByteArray();
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ return toBytes(o);
+ }
+ } else {
+ return toBytes(o);
+ }
+ }
+
+ private int firstNullSchemaTypeIndex(Schema toSchema) {
+ List<Schema> possibleTypes = toSchema.getTypes();
+ int unionIndex = 0;
+ for (int i = 0; i < possibleTypes.size(); i++) {
+ Type pType = possibleTypes.get(i).getType();
+ if (pType == Type.NULL) { // FIXME HUGE kludge to pass
tests
+ unionIndex = i;
+ break;
+ }
+ }
+ return unionIndex;
+ }
+
+ private int firstNotNullSchemaTypeIndex(Schema toSchema) {
+ List<Schema> possibleTypes = toSchema.getTypes();
+ int unionIndex = 0;
+ for (int i = 0; i < possibleTypes.size(); i++) {
+ Type pType = possibleTypes.get(i).getType();
+ if (pType != Type.NULL) { // FIXME HUGE kludge to pass
tests
+ unionIndex = i;
+ break;
+ }
+ }
+ return unionIndex;
+ }
+
+ public byte[] toBytes(Object o) {
+ return toBytes(encoder, o);
+ }
+
+ public static byte[] toBytes(Encoder encoder, Object o) {
+
+ try {
+ if (o instanceof String) {
+ return ((String) o).getBytes("UTF-8");
+ } else if (o instanceof Utf8) {
+ return copyIfNeeded(((Utf8) o).getBytes(), 0,
((Utf8) o).getByteLength());
+ } else if (o instanceof ByteBuffer) {
+ return copyIfNeeded(((ByteBuffer) o).array(),
+ ((ByteBuffer) o).arrayOffset()
+ ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
+ } else if (o instanceof Long) {
+ return encoder.encodeLong((Long) o);
+ } else if (o instanceof Integer) {
+ return encoder.encodeInt((Integer) o);
+ } else if (o instanceof Short) {
+ return encoder.encodeShort((Short) o);
+ } else if (o instanceof Byte) {
+ return encoder.encodeByte((Byte) o);
+ } else if (o instanceof Boolean) {
+ return encoder.encodeBoolean((Boolean) o);
+ } else if (o instanceof Float) {
+ return encoder.encodeFloat((Float) o);
+ } else if (o instanceof Double) {
+ return encoder.encodeDouble((Double) o);
+ } else if (o instanceof Enum) {
+ return encoder.encodeInt(((Enum<?>)
o).ordinal());
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+
+ throw new IllegalArgumentException(UNKOWN +
o.getClass().getName());
+ }
+
+ private BatchWriter getBatchWriter() throws IOException {
+ if (batchWriter == null)
+ try {
+ BatchWriterConfig batchWriterConfig = new
BatchWriterConfig();
+ batchWriterConfig.setMaxMemory(10000000);
+ batchWriterConfig.setMaxLatency(60000L,
TimeUnit.MILLISECONDS);
+ batchWriterConfig.setMaxWriteThreads(4);
+ batchWriter =
conn.createBatchWriter(mapping.tableName, batchWriterConfig);
+ } catch (TableNotFoundException e) {
+ throw new IOException(e);
+ }
+ return batchWriter;
+ }
+
+ /**
+ * Initialize the data store by reading the credentials, setting the
+ * client's properties up and reading the mapping file. Initialize is
called
+ * when then the call to
+ * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is
made.
+ *
+ * @param keyClass
+ * @param persistentClass
+ * @param properties
+ */
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) {
+ try {
+ super.initialize(keyClass, persistentClass, properties);
+
+ String mock = DataStoreFactory.findProperty(properties,
this, MOCK_PROPERTY, null);
+ String mappingFile =
DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
+ String user = DataStoreFactory.findProperty(properties,
this, USERNAME_PROPERTY, null);
+ String password =
DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
+
+ mapping = readMapping(mappingFile);
+
+ if (mapping.encoder == null ||
"".equals(mapping.encoder)) {
+ encoder = new BinaryEncoder();
+ } else {
+ try {
+ encoder = (Encoder)
getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
+ } catch (InstantiationException |
IllegalAccessException | ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ try {
+ AuthenticationToken token = new
PasswordToken(password);
+ if (mock == null || !mock.equals("true")) {
+ String instance =
DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
+ String zookeepers =
DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
+ conn = new ZooKeeperInstance(instance,
zookeepers).getConnector(user, token);
+ } else {
+ conn = new
MockInstance().getConnector(user, token);
+ }
+ credentials = new Credentials(user, token);
+
+ if (autoCreateSchema && !schemaExists())
+ createSchema();
+ } catch (AccumuloException | AccumuloSecurityException
e) {
+ throw new IOException(e);
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ protected AccumuloMapping readMapping(String filename) throws
IOException {
+ try {
+
+ AccumuloMapping mapping = new AccumuloMapping();
+
+ DocumentBuilder db =
DocumentBuilderFactory.newInstance().newDocumentBuilder();
+ Document dom =
db.parse(getClass().getClassLoader().getResourceAsStream(filename));
+
+ Element root = dom.getDocumentElement();
+
+ NodeList nl = root.getElementsByTagName("class");
+ for (int i = 0; i < nl.getLength(); i++) {
+
+ Element classElement = (Element) nl.item(i);
+ if
(classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
+ &&
classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
+
+ mapping.tableName =
getSchemaName(classElement.getAttribute("table"), persistentClass);
+ mapping.encoder =
classElement.getAttribute("encoder");
+
+ NodeList fields =
classElement.getElementsByTagName("field");
+ for (int j = 0; j < fields.getLength();
j++) {
+ Element fieldElement =
(Element) fields.item(j);
+
+ String name =
fieldElement.getAttribute("name");
+ String family =
fieldElement.getAttribute("family");
+ String qualifier =
fieldElement.getAttribute("qualifier");
+ if ("".equals(qualifier))
+ qualifier = null;
+
+ Pair<Text, Text> col = new
Pair<>(new Text(family),
+ qualifier ==
null ? null : new Text(qualifier));
+ mapping.fieldMap.put(name, col);
+ mapping.columnMap.put(col,
name);
+ }
+ }
+
+ }
+
+ if (mapping.tableName == null) {
+ throw new GoraException("Please define the
accumulo 'table' name mapping in " + filename + " for "
+ +
persistentClass.getCanonicalName());
+ }
+
+ nl = root.getElementsByTagName("table");
+ for (int i = 0; i < nl.getLength(); i++) {
+ Element tableElement = (Element) nl.item(i);
+ if
(tableElement.getAttribute("name").equals(mapping.tableName)) {
+ NodeList configs =
tableElement.getElementsByTagName("config");
+ for (int j = 0; j <
configs.getLength(); j++) {
+ Element configElement =
(Element) configs.item(j);
+ String key =
configElement.getAttribute("key");
+ String val =
configElement.getAttribute("value");
+ mapping.tableConfig.put(key,
val);
+ }
+ }
+ }
+
+ return mapping;
+ } catch (Exception ex) {
+ throw new IOException("Unable to read " + filename, ex);
+ }
+
+ }
+
+ @Override
+ public String getSchemaName() {
+ return mapping.tableName;
+ }
+
+ @Override
+ public void createSchema() {
+ try {
+ conn.tableOperations().create(mapping.tableName);
+ Set<Entry<String, String>> es =
mapping.tableConfig.entrySet();
+ for (Entry<String, String> entry : es) {
+
conn.tableOperations().setProperty(mapping.tableName, entry.getKey(),
entry.getValue());
+ }
+
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ LOG.error(e.getMessage(), e);
+ } catch (TableExistsException e) {
+ LOG.debug(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void deleteSchema() {
+ try {
+ if (batchWriter != null)
+ batchWriter.close();
+ batchWriter = null;
+ conn.tableOperations().delete(mapping.tableName);
+ } catch (AccumuloException | AccumuloSecurityException |
TableNotFoundException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean schemaExists() {
+ return conn.tableOperations().exists(mapping.tableName);
+ }
+
+ public ByteSequence populate(Iterator<Entry<Key, Value>> iter, T
persistent) throws IOException {
+ ByteSequence row = null;
+
+ Map<Utf8, Object> currentMap = null;
+ List currentArray = null;
+ Text currentFam = null;
+ int currentPos = 0;
+ Schema currentSchema = null;
+ Field currentField = null;
+
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new
byte[0], null);
+
+ while (iter.hasNext()) {
+ Entry<Key, Value> entry = iter.next();
+
+ if (row == null) {
+ row = entry.getKey().getRowData();
+ }
+ byte[] val = entry.getValue().get();
+
+ Field field = fieldMap.get(getFieldName(entry));
+
+ if (currentMap != null) {
+ if
(currentFam.equals(entry.getKey().getColumnFamily())) {
+ currentMap.put(new
Utf8(entry.getKey().getColumnQualifierData().toArray()),
+
fromBytes(currentSchema, entry.getValue().get()));
+ continue;
+ } else {
+ persistent.put(currentPos, currentMap);
+ currentMap = null;
+ }
+ } else if (currentArray != null) {
+ if
(currentFam.equals(entry.getKey().getColumnFamily())) {
+
currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+ continue;
+ } else {
+ persistent.put(currentPos, new
GenericData.Array<T>(currentField.schema(), currentArray));
+ currentArray = null;
+ }
+ }
+
+ switch (field.schema().getType()) {
+ case MAP: // first entry only. Next are handled above
on the next
+ // loop
+ currentMap = new DirtyMapWrapper<>(new
HashMap<Utf8, Object>());
+ currentPos = field.pos();
+ currentFam = entry.getKey().getColumnFamily();
+ currentSchema = field.schema().getValueType();
+
+ currentMap.put(new
Utf8(entry.getKey().getColumnQualifierData().toArray()),
+ fromBytes(currentSchema,
entry.getValue().get()));
+ break;
+ case ARRAY:
+ currentArray = new DirtyListWrapper<>(new
ArrayList<>());
+ currentPos = field.pos();
+ currentFam = entry.getKey().getColumnFamily();
+ currentSchema = field.schema().getElementType();
+ currentField = field;
+
+ currentArray.add(fromBytes(currentSchema,
entry.getValue().get()));
+
+ break;
+ case UNION:// default value of null acts like union
with null
+ Schema effectiveSchema =
field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema()));
+ // map and array were coded without union index
so need to be
+ // read the same way
+ if (effectiveSchema.getType() == Type.ARRAY) {
+ currentArray = new
DirtyListWrapper<>(new ArrayList<>());
+ currentPos = field.pos();
+ currentFam =
entry.getKey().getColumnFamily();
+ currentSchema =
field.schema().getElementType();
+ currentField = field;
+
+
currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+ break;
+ } else if (effectiveSchema.getType() ==
Type.MAP) {
+ currentMap = new DirtyMapWrapper<>(new
HashMap<Utf8, Object>());
+ currentPos = field.pos();
+ currentFam =
entry.getKey().getColumnFamily();
+ currentSchema =
effectiveSchema.getValueType();
+
+ currentMap.put(new
Utf8(entry.getKey().getColumnQualifierData().toArray()),
+
fromBytes(currentSchema, entry.getValue().get()));
+ break;
+ }
+ // continue like a regular top-level union
+ case RECORD:
+ SpecificDatumReader<?> reader = new
SpecificDatumReader<Schema>(field.schema());
+ persistent.put(field.pos(), reader.read(null,
DecoderFactory.get().binaryDecoder(val, decoder)));
+ break;
+ default:
+ persistent.put(field.pos(),
fromBytes(field.schema(), entry.getValue().get()));
+ }
+ }
+
+ if (currentMap != null) {
+ persistent.put(currentPos, currentMap);
+ } else if (currentArray != null) {
+ persistent.put(currentPos, new
GenericData.Array<T>(currentField.schema(), currentArray));
+ }
+
+ persistent.clearDirty();
+
+ return row;
+ }
+
+ /**
+ * Retrieve field name from entry.
+ *
+ * @param entry
+ * The Key-Value entry
+ * @return String The field name
+ */
+ private String getFieldName(Entry<Key, Value> entry) {
+ String fieldName = mapping.columnMap
+ .get(new
Pair<>(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()));
+ if (fieldName == null) {
+ fieldName = mapping.columnMap.get(new Pair<Text,
Text>(entry.getKey().getColumnFamily(), null));
+ }
+ return fieldName;
+ }
+
+ private void setFetchColumns(Scanner scanner, String[] fields) {
+ fields = getFieldsToQuery(fields);
+ for (String field : fields) {
+ Pair<Text, Text> col = mapping.fieldMap.get(field);
+ if (col != null) {
+ if (col.getSecond() == null) {
+
scanner.fetchColumnFamily(col.getFirst());
+ } else {
+ scanner.fetchColumn(col.getFirst(),
col.getSecond());
+ }
+ } else {
+ LOG.error("Mapping not found for field: {}",
field);
+ }
+ }
+ }
+
+ @Override
+ public T get(K key, String[] fields) {
+ try {
+ // TODO make isolated scanner optional?
+ Scanner scanner = new
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
+ Range rowRange = new Range(new Text(toBytes(key)));
+
+ scanner.setRange(rowRange);
+ setFetchColumns(scanner, fields);
+
+ T persistent = newPersistent();
+ ByteSequence row = populate(scanner.iterator(),
persistent);
+ if (row == null)
+ return null;
+ return persistent;
+ } catch (TableNotFoundException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public void put(K key, T val) {
+
+ try {
+ Mutation m = new Mutation(new Text(toBytes(key)));
+
+ Schema schema = val.getSchema();
+ List<Field> fields = schema.getFields();
+ int count = 0;
+
+ for (int i = 0; i < fields.size(); i++) {
+ if (!val.isDirty(i)) {
+ continue;
+ }
+ Field field = fields.get(i);
+
+ Object o = val.get(field.pos());
+
+ Pair<Text, Text> col =
mapping.fieldMap.get(field.name());
+
+ if (col == null) {
+ throw new GoraException("Please define
the gora to accumulo mapping for field " + field.name());
+ }
+
+ switch (field.schema().getType()) {
+ case MAP:
+ count = putMap(m, count,
field.schema().getValueType(), o, col, field.name());
+ break;
+ case ARRAY:
+ count = putArray(m, count, o, col,
field.name());
+ break;
+ case UNION: // default value of null acts like
union with null
+ Schema effectiveSchema =
field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema()));
+ // map and array need to compute
qualifier
+ if (effectiveSchema.getType() ==
Type.ARRAY) {
+ count = putArray(m, count, o,
col, field.name());
+ break;
+ } else if (effectiveSchema.getType() ==
Type.MAP) {
+ count = putMap(m, count,
effectiveSchema.getValueType(), o, col, field.name());
+ break;
+ }
+ // continue like a regular top-level
union
+ case RECORD:
+ final SpecificDatumWriter<Object>
writer = new SpecificDatumWriter<>(field.schema());
+ final byte[] byteData =
IOUtils.serialize(writer, o);
+ m.put(col.getFirst(), col.getSecond(),
new Value(byteData));
+ count++;
+ break;
+ default:
+ m.put(col.getFirst(), col.getSecond(),
new Value(toBytes(o)));
+ count++;
+ }
+
+ }
+
+ if (count > 0)
+ try {
+ getBatchWriter().addMutation(m);
+ } catch (MutationsRejectedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private int putMap(Mutation m, int count, Schema valueType, Object o,
Pair<Text, Text> col, String fieldName)
+ throws GoraException {
+
+ // First of all we delete map field on accumulo store
+ Text rowKey = new Text(m.getRow());
+ Query<K, T> query = newQuery();
+ query.setFields(fieldName);
+ query.setStartKey((K) rowKey.toString());
+ query.setEndKey((K) rowKey.toString());
+ deleteByQuery(query);
+ flush();
+ if (o == null) {
+ return 0;
+ }
+
+ Set<?> es = ((Map<?, ?>) o).entrySet();
+ for (Object entry : es) {
+ Object mapKey = ((Entry<?, ?>) entry).getKey();
+ Object mapVal = ((Entry<?, ?>) entry).getValue();
+ if ((o instanceof DirtyMapWrapper &&
((DirtyMapWrapper<?, ?>) o).isDirty())
+ || !(o instanceof DirtyMapWrapper)) {
+ m.put(col.getFirst(), new
Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal)));
+ count++;
+ }
+ // TODO map value deletion
+ }
+ return count;
+ }
+
+ private int putArray(Mutation m, int count, Object o, Pair<Text, Text>
col, String fieldName) {
+
+ // First of all we delete array field on accumulo store
+ Text rowKey = new Text(m.getRow());
+ Query<K, T> query = newQuery();
+ query.setFields(fieldName);
+ query.setStartKey((K) rowKey.toString());
+ query.setEndKey((K) rowKey.toString());
+ deleteByQuery(query);
+ flush();
+ if (o == null) {
+ return 0;
+ }
+
+ List<?> array = (List<?>) o; // both GenericArray and
DirtyListWrapper
+ int j = 0;
+ for (Object item : array) {
+ m.put(col.getFirst(), new Text(toBytes(j++)), new
Value(toBytes(item)));
+ count++;
+ }
+ return count;
+ }
+
+ @Override
+ public boolean delete(K key) {
+ Query<K, T> q = newQuery();
+ q.setKey(key);
+ return deleteByQuery(q) > 0;
+ }
+
+ @Override
+ public long deleteByQuery(Query<K, T> query) {
+ try {
+ Scanner scanner = createScanner(query);
+ // add iterator that drops values on the server side
+ scanner.addScanIterator(new
IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class));
+ RowIterator iterator = new
RowIterator(scanner.iterator());
+
+ long count = 0;
+
+ while (iterator.hasNext()) {
+ Iterator<Entry<Key, Value>> row =
iterator.next();
+ Mutation m = null;
+ while (row.hasNext()) {
+ Entry<Key, Value> entry = row.next();
+ Key key = entry.getKey();
+ if (m == null)
+ m = new Mutation(key.getRow());
+ // TODO optimize to avoid continually
creating column vis?
+ // prob does not matter for empty
+ m.putDelete(key.getColumnFamily(),
key.getColumnQualifier(),
+ new
ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
+ }
+ getBatchWriter().addMutation(m);
+ count++;
+ }
+
+ return count;
+ } catch (TableNotFoundException e) {
+ // TODO return 0?
+ LOG.error(e.getMessage(), e);
+ return 0;
+ } catch (MutationsRejectedException e) {
+ LOG.error(e.getMessage(), e);
+ return 0;
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return 0;
+ }
+ }
+
+ private Range createRange(Query<K, T> query) {
+ Text startRow = null;
+ Text endRow = null;
+
+ if (query.getStartKey() != null)
+ startRow = new Text(toBytes(query.getStartKey()));
+
+ if (query.getEndKey() != null)
+ endRow = new Text(toBytes(query.getEndKey()));
+
+ return new Range(startRow, true, endRow, true);
+
+ }
+
+ private Scanner createScanner(Query<K, T> query) throws
TableNotFoundException {
+ // TODO make isolated scanner optional?
+ Scanner scanner = new
IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY));
+ setFetchColumns(scanner, query.getFields());
+
+ scanner.setRange(createRange(query));
+
+ if (query.getStartTime() != -1 || query.getEndTime() != -1) {
+ IteratorSetting is = new IteratorSetting(30,
TimestampFilter.class);
+ if (query.getStartTime() != -1)
+ TimestampFilter.setStart(is,
query.getStartTime(), true);
+ if (query.getEndTime() != -1)
+ TimestampFilter.setEnd(is, query.getEndTime(),
true);
+
+ scanner.addScanIterator(is);
+ }
+
+ return scanner;
+ }
+
+ /**
+ * Execute the query and return the result.
+ */
+ @Override
+ public Result<K, T> execute(Query<K, T> query) {
+ try {
+ Scanner scanner = createScanner(query);
+ return new AccumuloResult<>(this, query, scanner);
+ } catch (TableNotFoundException e) {
+ // TODO return empty result?
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public Query<K, T> newQuery() {
+ return new AccumuloQuery<>(this);
+ }
+
+ Text pad(Text key, int bytes) {
+ if (key.getLength() < bytes)
+ key = new Text(key);
+
+ while (key.getLength() < bytes) {
+ key.append(new byte[] { 0 }, 0, 1);
+ }
+
+ return key;
+ }
+
+ @Override
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
+ try {
+ TabletLocator tl;
+ if (conn instanceof MockConnector)
+ tl = new MockTabletLocator();
+ else
+ tl = TabletLocator.getLocator(
+ new
ClientContext(conn.getInstance(), credentials,
+
AccumuloConfiguration.getTableConfiguration(conn,
+
Tables.getTableId(conn.getInstance(), mapping.tableName))),
+ new
Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
+
+ Map<String, Map<KeyExtent, List<Range>>> binnedRanges =
new HashMap<>();
+
+ tl.invalidateCache();
+ while (tl.binRanges(
+ new ClientContext(conn.getInstance(),
credentials,
+
AccumuloConfiguration.getTableConfiguration(conn,
+
Tables.getTableId(conn.getInstance(), mapping.tableName))),
+
Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
+ // TODO log?
+ if (!Tables.exists(conn.getInstance(),
Tables.getTableId(conn.getInstance(), mapping.tableName)))
+ throw new
TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
+ else if
(Tables.getTableState(conn.getInstance(),
+
Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
+ throw new
TableOfflineException(conn.getInstance(),
+
Tables.getTableId(conn.getInstance(), mapping.tableName));
+ UtilWaitThread.sleep(100);
+ tl.invalidateCache();
+ }
+
+ List<PartitionQuery<K, T>> ret = new ArrayList<>();
+
+ Text startRow = null;
+ Text endRow = null;
+ if (query.getStartKey() != null)
+ startRow = new
Text(toBytes(query.getStartKey()));
+ if (query.getEndKey() != null)
+ endRow = new Text(toBytes(query.getEndKey()));
+
+ // hadoop expects hostnames, accumulo keeps track of
IPs... so need
+ // to convert
+ HashMap<String, String> hostNameCache = new HashMap<>();
+
+ for (Entry<String, Map<KeyExtent, List<Range>>> entry :
binnedRanges.entrySet()) {
+ String ip = entry.getKey().split(":", 2)[0];
+ String location = hostNameCache.get(ip);
+ if (location == null) {
+ InetAddress inetAddress =
InetAddress.getByName(ip);
+ location = inetAddress.getHostName();
+ hostNameCache.put(ip, location);
+ }
+
+ Map<KeyExtent, List<Range>> tablets =
entry.getValue();
+ for (KeyExtent ke : tablets.keySet()) {
+
+ K startKey = null;
+ if (startRow == null ||
!ke.contains(startRow)) {
+ if (ke.getPrevEndRow() != null)
{
+ startKey =
followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow()));
+ }
+ } else {
+ startKey =
fromBytes(getKeyClass(), getBytes(startRow));
+ }
+
+ K endKey = null;
+ if (endRow == null ||
!ke.contains(endRow)) {
+ if (ke.getEndRow() != null)
+ endKey =
lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow()));
+ } else {
+ endKey =
fromBytes(getKeyClass(), getBytes(endRow));
+ }
+
+ PartitionQueryImpl<K, T> pqi = new
PartitionQueryImpl<>(query, startKey, endKey, location);
+ pqi.setConf(getConf());
+ ret.add(pqi);
+ }
+ }
+
+ return ret;
+ } catch (TableNotFoundException | AccumuloException |
AccumuloSecurityException e) {
+ throw new IOException(e);
+ }
+
+ }
+
+ static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[]
er) {
+
+ if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+ throw new UnsupportedOperationException();
+ } else if (clazz.equals(Boolean.TYPE) ||
clazz.equals(Boolean.class)) {
+ throw new UnsupportedOperationException();
+ } else if (clazz.equals(Short.TYPE) ||
clazz.equals(Short.class)) {
+ return fromBytes(encoder, clazz,
encoder.lastPossibleKey(2, er));
+ } else if (clazz.equals(Integer.TYPE) ||
clazz.equals(Integer.class)) {
+ return fromBytes(encoder, clazz,
encoder.lastPossibleKey(4, er));
+ } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class))
{
+ return fromBytes(encoder, clazz,
encoder.lastPossibleKey(8, er));
+ } else if (clazz.equals(Float.TYPE) ||
clazz.equals(Float.class)) {
+ return fromBytes(encoder, clazz,
encoder.lastPossibleKey(4, er));
+ } else if (clazz.equals(Double.TYPE) ||
clazz.equals(Double.class)) {
+ return fromBytes(encoder, clazz,
encoder.lastPossibleKey(8, er));
+ } else if (clazz.equals(String.class)) {
+ throw new UnsupportedOperationException();
+ } else if (clazz.equals(Utf8.class)) {
+ return fromBytes(encoder, clazz, er);
+ }
+
+ throw new IllegalArgumentException(UNKOWN + clazz.getName());
+ }
+
+ @SuppressWarnings("unchecked")
+ static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
+
+ if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+ return (K) Byte.valueOf(encoder.followingKey(1,
per)[0]);
+ } else if (clazz.equals(Boolean.TYPE) ||
clazz.equals(Boolean.class)) {
+ throw new UnsupportedOperationException();
+ } else if (clazz.equals(Short.TYPE) ||
clazz.equals(Short.class)) {
+ return fromBytes(encoder, clazz,
encoder.followingKey(2, per));
+ } else if (clazz.equals(Integer.TYPE) ||
clazz.equals(Integer.class)) {
+ return fromBytes(encoder, clazz,
encoder.followingKey(4, per));
+ } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class))
{
+ return fromBytes(encoder, clazz,
encoder.followingKey(8, per));
+ } else if (clazz.equals(Float.TYPE) ||
clazz.equals(Float.class)) {
+ return fromBytes(encoder, clazz,
encoder.followingKey(4, per));
+ } else if (clazz.equals(Double.TYPE) ||
clazz.equals(Double.class)) {
+ return fromBytes(encoder, clazz,
encoder.followingKey(8, per));
+ } else if (clazz.equals(String.class)) {
+ throw new UnsupportedOperationException();
+ } else if (clazz.equals(Utf8.class)) {
+ return fromBytes(encoder, clazz, Arrays.copyOf(per,
per.length + 1));
+ }
+
+ throw new IllegalArgumentException(UNKOWN + clazz.getName());
+ }
+
+ @Override
+ public void flush() {
+ try {
+ if (batchWriter != null) {
+ batchWriter.flush();
+ }
+ } catch (MutationsRejectedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (batchWriter != null) {
+ batchWriter.close();
+ batchWriter = null;
+ }
+ } catch (MutationsRejectedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
}