http://git-wip-us.apache.org/repos/asf/gora/blob/4bbf52ee/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java ---------------------------------------------------------------------- diff --cc gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java index c0cd026,a4cddce..bac354b --- a/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java +++ b/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java @@@ -1,1042 -1,1064 +1,1044 @@@ --/** -- * Licensed to the Apache Software Foundation (ASF) under one or more -- * contributor license agreements. See the NOTICE file distributed with -- * this work for additional information regarding copyright ownership. -- * The ASF licenses this file to You under the Apache License, Version 2.0 -- * (the "License"); you may not use this file except in compliance with -- * the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * -- * Unless required by applicable law or agreed to in writing, software -- * distributed under the License is distributed on an "AS IS" BASIS, -- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- * See the License for the specific language governing permissions and -- * limitations under the License. -- */ --package org.apache.gora.accumulo.store; -- --import java.io.ByteArrayOutputStream; --import java.io.IOException; --import java.net.InetAddress; --import java.nio.ByteBuffer; --import java.util.ArrayList; --import java.util.Arrays; --import java.util.Collections; --import java.util.HashMap; --import java.util.Iterator; --import java.util.List; --import java.util.Map; --import java.util.Map.Entry; --import java.util.Properties; --import java.util.Set; --import java.util.concurrent.TimeUnit; -- --import javax.xml.parsers.DocumentBuilder; --import javax.xml.parsers.DocumentBuilderFactory; -- --import org.apache.accumulo.core.client.AccumuloException; --import org.apache.accumulo.core.client.AccumuloSecurityException; --import org.apache.accumulo.core.client.BatchWriter; --import org.apache.accumulo.core.client.BatchWriterConfig; --import org.apache.accumulo.core.client.Connector; --import org.apache.accumulo.core.client.IsolatedScanner; --import org.apache.accumulo.core.client.IteratorSetting; --import org.apache.accumulo.core.client.MutationsRejectedException; --import org.apache.accumulo.core.client.RowIterator; --import org.apache.accumulo.core.client.Scanner; --import org.apache.accumulo.core.client.TableDeletedException; --import org.apache.accumulo.core.client.TableExistsException; --import org.apache.accumulo.core.client.TableNotFoundException; --import org.apache.accumulo.core.client.TableOfflineException; --import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.impl.ClientContext; --import org.apache.accumulo.core.client.impl.Tables; --import org.apache.accumulo.core.client.impl.TabletLocator; --import org.apache.accumulo.core.client.mock.MockConnector; --import org.apache.accumulo.core.client.mock.MockInstance; --import org.apache.accumulo.core.client.mock.impl.MockTabletLocator; --import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; --import org.apache.accumulo.core.client.security.tokens.PasswordToken; --import org.apache.accumulo.core.conf.AccumuloConfiguration; --import org.apache.accumulo.core.data.ByteSequence; --import org.apache.accumulo.core.data.Key; - import org.apache.accumulo.core.data.impl.KeyExtent; --import org.apache.accumulo.core.data.Mutation; --import org.apache.accumulo.core.data.Range; --import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; --import org.apache.accumulo.core.iterators.SortedKeyIterator; --import org.apache.accumulo.core.iterators.user.TimestampFilter; --import org.apache.accumulo.core.master.state.tables.TableState; --import org.apache.accumulo.core.security.Authorizations; --import org.apache.accumulo.core.security.ColumnVisibility; - import org.apache.accumulo.core.client.impl.ClientContext; --import org.apache.accumulo.core.client.impl.Credentials; --import org.apache.accumulo.core.util.Pair; --import org.apache.accumulo.core.util.UtilWaitThread; --import org.apache.avro.Schema; --import org.apache.avro.Schema.Field; --import org.apache.avro.Schema.Type; --import org.apache.avro.generic.GenericData; --import org.apache.avro.io.BinaryDecoder; --import org.apache.avro.io.Decoder; --import org.apache.avro.io.DecoderFactory; --import org.apache.avro.io.EncoderFactory; --import org.apache.avro.specific.SpecificDatumReader; --import org.apache.avro.specific.SpecificDatumWriter; --import org.apache.avro.util.Utf8; --import org.apache.gora.accumulo.encoders.BinaryEncoder; --import org.apache.gora.accumulo.encoders.Encoder; --import org.apache.gora.accumulo.query.AccumuloQuery; --import org.apache.gora.accumulo.query.AccumuloResult; --import org.apache.gora.persistency.impl.DirtyListWrapper; --import org.apache.gora.persistency.impl.DirtyMapWrapper; --import org.apache.gora.persistency.impl.PersistentBase; --import org.apache.gora.query.PartitionQuery; --import org.apache.gora.query.Query; --import org.apache.gora.query.Result; --import org.apache.gora.query.impl.PartitionQueryImpl; --import org.apache.gora.store.DataStoreFactory; --import org.apache.gora.store.impl.DataStoreBase; --import org.apache.gora.util.AvroUtils; --import org.apache.gora.util.GoraException; --import org.apache.gora.util.IOUtils; --import org.apache.hadoop.io.Text; --import org.slf4j.Logger; --import org.slf4j.LoggerFactory; --import org.w3c.dom.Document; --import org.w3c.dom.Element; --import org.w3c.dom.NodeList; -- --/** -- * Implementation of a Accumulo data store to be used by gora. -- * - * @param <K> class to be used for the key - * @param <T> class to be persisted within the store - * @param <K> - * class to be used for the key - * @param <T> - * class to be persisted within the store -- */ - public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T> { -public class AccumuloStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { -- - protected static final String MOCK_PROPERTY = "accumulo.mock"; - protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance"; - protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers"; - protected static final String USERNAME_PROPERTY = "accumulo.user"; - protected static final String PASSWORD_PROPERTY = "accumulo.password"; - protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml"; - protected static final String MOCK_PROPERTY = "accumulo.mock"; - protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance"; - protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers"; - protected static final String USERNAME_PROPERTY = "accumulo.user"; - protected static final String PASSWORD_PROPERTY = "accumulo.password"; - protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml"; -- - private final static String UNKOWN = "Unknown type "; - private final static String UNKOWN = "Unknown type "; -- - private Connector conn; - private BatchWriter batchWriter; - private AccumuloMapping mapping; - private Credentials credentials; - private Encoder encoder; - private Connector conn; - private BatchWriter batchWriter; - private AccumuloMapping mapping; - private Credentials credentials; - private Encoder encoder; -- - public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class); - public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class); -- - public Object fromBytes(Schema schema, byte[] data) throws IOException { - Schema fromSchema = null; - if (schema.getType() == Type.UNION) { - try { - Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); - int unionIndex = decoder.readIndex(); - List<Schema> possibleTypes = schema.getTypes(); - fromSchema = possibleTypes.get(unionIndex); - Schema effectiveSchema = possibleTypes.get(unionIndex); - if (effectiveSchema.getType() == Type.NULL) { - decoder.readNull(); - return null; - } else { - data = decoder.readBytes(null).array(); - } - } catch (IOException e) { - LOG.error(e.getMessage()); - throw new GoraException("Error decoding union type: ", e); - } - } else { - fromSchema = schema; - } - return fromBytes(encoder, fromSchema, data); - } - public Object fromBytes(Schema schema, byte[] data) throws IOException { - Schema fromSchema = null; - if (schema.getType() == Type.UNION) { - try { - Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); - int unionIndex = decoder.readIndex(); - List<Schema> possibleTypes = schema.getTypes(); - fromSchema = possibleTypes.get(unionIndex); - Schema effectiveSchema = possibleTypes.get(unionIndex); - if (effectiveSchema.getType() == Type.NULL) { - decoder.readNull(); - return null; - } else { - data = decoder.readBytes(null).array(); - } - } catch (IOException e) { - LOG.error(e.getMessage()); - throw new GoraException("Error decoding union type: ", e); - } - } else { - fromSchema = schema; - } - return fromBytes(encoder, fromSchema, data); - } -- - public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) throws IOException { - switch (schema.getType()) { - case BOOLEAN: - return encoder.decodeBoolean(data); - case DOUBLE: - return encoder.decodeDouble(data); - case FLOAT: - return encoder.decodeFloat(data); - case INT: - return encoder.decodeInt(data); - case LONG: - return encoder.decodeLong(data); - case STRING: - return new Utf8(data); - case BYTES: - return ByteBuffer.wrap(data); - case ENUM: - return AvroUtils.getEnumValue(schema, encoder.decodeInt(data)); - case ARRAY: - break; - case FIXED: - break; - case MAP: - break; - case NULL: - break; - case RECORD: - break; - case UNION: - break; - default: - break; - } - throw new IllegalArgumentException(UNKOWN + schema.getType()); - public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) throws IOException { - switch (schema.getType()) { - case BOOLEAN: - return encoder.decodeBoolean(data); - case DOUBLE: - return encoder.decodeDouble(data); - case FLOAT: - return encoder.decodeFloat(data); - case INT: - return encoder.decodeInt(data); - case LONG: - return encoder.decodeLong(data); - case STRING: - return new Utf8(data); - case BYTES: - return ByteBuffer.wrap(data); - case ENUM: - return AvroUtils.getEnumValue(schema, encoder.decodeInt(data)); - case ARRAY: - break; - case FIXED: - break; - case MAP: - break; - case NULL: - break; - case RECORD: - break; - case UNION: - break; - default: - break; - } - throw new IllegalArgumentException(UNKOWN + schema.getType()); -- - } - } -- - private static byte[] getBytes(Text text) { - byte[] bytes = text.getBytes(); - if (bytes.length != text.getLength()) { - bytes = new byte[text.getLength()]; - System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length); - } - return bytes; - } - private static byte[] getBytes(Text text) { - byte[] bytes = text.getBytes(); - if (bytes.length != text.getLength()) { - bytes = new byte[text.getLength()]; - System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length); - } - return bytes; - } -- - public K fromBytes(Class<K> clazz, byte[] val) { - return fromBytes(encoder, clazz, val); - } - public K fromBytes(Class<K> clazz, byte[] val) { - return fromBytes(encoder, clazz, val); - } -- - @SuppressWarnings("unchecked") - public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) { - try { - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - return (K) Byte.valueOf(encoder.decodeByte(val)); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - return (K) Boolean.valueOf(encoder.decodeBoolean(val)); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return (K) Short.valueOf(encoder.decodeShort(val)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return (K) Integer.valueOf(encoder.decodeInt(val)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return (K) Long.valueOf(encoder.decodeLong(val)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return (K) Float.valueOf(encoder.decodeFloat(val)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return (K) Double.valueOf(encoder.decodeDouble(val)); - } else if (clazz.equals(String.class)) { - return (K) new String(val, "UTF-8"); - } else if (clazz.equals(Utf8.class)) { - return (K) new Utf8(val); - } - @SuppressWarnings("unchecked") - public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) { - try { - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - return (K) Byte.valueOf(encoder.decodeByte(val)); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - return (K) Boolean.valueOf(encoder.decodeBoolean(val)); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return (K) Short.valueOf(encoder.decodeShort(val)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return (K) Integer.valueOf(encoder.decodeInt(val)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return (K) Long.valueOf(encoder.decodeLong(val)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return (K) Float.valueOf(encoder.decodeFloat(val)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return (K) Double.valueOf(encoder.decodeDouble(val)); - } else if (clazz.equals(String.class)) { - return (K) new String(val, "UTF-8"); - } else if (clazz.equals(Utf8.class)) { - return (K) new Utf8(val); - } -- - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } catch (IOException ioe) { - LOG.error(ioe.getMessage()); - throw new RuntimeException(ioe); - } - } - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } catch (IOException ioe) { - LOG.error(ioe.getMessage()); - throw new RuntimeException(ioe); - } - } -- - private static byte[] copyIfNeeded(byte b[], int offset, int len) { - if (len != b.length || offset != 0) { - byte[] copy = new byte[len]; - System.arraycopy(b, offset, copy, 0, copy.length); - b = copy; - } - return b; - } - private static byte[] copyIfNeeded(byte b[], int offset, int len) { - if (len != b.length || offset != 0) { - byte[] copy = new byte[len]; - System.arraycopy(b, offset, copy, 0, copy.length); - b = copy; - } - return b; - } -- - public byte[] toBytes(Schema toSchema, Object o) { - if (toSchema != null && toSchema.getType() == Type.UNION) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null); - int unionIndex = 0; - try { - if (o == null) { - unionIndex = firstNullSchemaTypeIndex(toSchema); - avroEncoder.writeIndex(unionIndex); - avroEncoder.writeNull(); - } else { - unionIndex = firstNotNullSchemaTypeIndex(toSchema); - avroEncoder.writeIndex(unionIndex); - avroEncoder.writeBytes(toBytes(o)); - } - avroEncoder.flush(); - return baos.toByteArray(); - } catch (IOException e) { - LOG.error(e.getMessage()); - return toBytes(o); - } - } else { - return toBytes(o); - } - } - public byte[] toBytes(Schema toSchema, Object o) { - if (toSchema != null && toSchema.getType() == Type.UNION) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null); - int unionIndex = 0; - try { - if (o == null) { - unionIndex = firstNullSchemaTypeIndex(toSchema); - avroEncoder.writeIndex(unionIndex); - avroEncoder.writeNull(); - } else { - unionIndex = firstNotNullSchemaTypeIndex(toSchema); - avroEncoder.writeIndex(unionIndex); - avroEncoder.writeBytes(toBytes(o)); - } - avroEncoder.flush(); - return baos.toByteArray(); - } catch (IOException e) { - LOG.error(e.getMessage()); - return toBytes(o); - } - } else { - return toBytes(o); - } - } -- - private int firstNullSchemaTypeIndex(Schema toSchema) { - List<Schema> possibleTypes = toSchema.getTypes(); - int unionIndex = 0; - for (int i = 0; i < possibleTypes.size(); i++ ) { - Type pType = possibleTypes.get(i).getType(); - if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests - unionIndex = i; break; - } - } - return unionIndex; - } - private int firstNullSchemaTypeIndex(Schema toSchema) { - List<Schema> possibleTypes = toSchema.getTypes(); - int unionIndex = 0; - for (int i = 0; i < possibleTypes.size(); i++) { - Type pType = possibleTypes.get(i).getType(); - if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests - unionIndex = i; - break; - } - } - return unionIndex; - } -- - private int firstNotNullSchemaTypeIndex(Schema toSchema) { - List<Schema> possibleTypes = toSchema.getTypes(); - int unionIndex = 0; - for (int i = 0; i < possibleTypes.size(); i++ ) { - Type pType = possibleTypes.get(i).getType(); - if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests - unionIndex = i; break; - } - } - return unionIndex; - } - private int firstNotNullSchemaTypeIndex(Schema toSchema) { - List<Schema> possibleTypes = toSchema.getTypes(); - int unionIndex = 0; - for (int i = 0; i < possibleTypes.size(); i++) { - Type pType = possibleTypes.get(i).getType(); - if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests - unionIndex = i; - break; - } - } - return unionIndex; - } -- - public byte[] toBytes(Object o) { - return toBytes(encoder, o); - } - public byte[] toBytes(Object o) { - return toBytes(encoder, o); - } -- - public static byte[] toBytes(Encoder encoder, Object o) { - public static byte[] toBytes(Encoder encoder, Object o) { -- - try { - if (o instanceof String) { - return ((String) o).getBytes("UTF-8"); - } else if (o instanceof Utf8) { - return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength()); - } else if (o instanceof ByteBuffer) { - return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining()); - } else if (o instanceof Long) { - return encoder.encodeLong((Long) o); - } else if (o instanceof Integer) { - return encoder.encodeInt((Integer) o); - } else if (o instanceof Short) { - return encoder.encodeShort((Short) o); - } else if (o instanceof Byte) { - return encoder.encodeByte((Byte) o); - } else if (o instanceof Boolean) { - return encoder.encodeBoolean((Boolean) o); - } else if (o instanceof Float) { - return encoder.encodeFloat((Float) o); - } else if (o instanceof Double) { - return encoder.encodeDouble((Double) o); - } else if (o instanceof Enum) { - return encoder.encodeInt(((Enum<?>) o).ordinal()); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - try { - if (o instanceof String) { - return ((String) o).getBytes("UTF-8"); - } else if (o instanceof Utf8) { - return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength()); - } else if (o instanceof ByteBuffer) { - return copyIfNeeded(((ByteBuffer) o).array(), - ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining()); - } else if (o instanceof Long) { - return encoder.encodeLong((Long) o); - } else if (o instanceof Integer) { - return encoder.encodeInt((Integer) o); - } else if (o instanceof Short) { - return encoder.encodeShort((Short) o); - } else if (o instanceof Byte) { - return encoder.encodeByte((Byte) o); - } else if (o instanceof Boolean) { - return encoder.encodeBoolean((Boolean) o); - } else if (o instanceof Float) { - return encoder.encodeFloat((Float) o); - } else if (o instanceof Double) { - return encoder.encodeDouble((Double) o); - } else if (o instanceof Enum) { - return encoder.encodeInt(((Enum<?>) o).ordinal()); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } -- - throw new IllegalArgumentException(UNKOWN + o.getClass().getName()); - } - throw new IllegalArgumentException(UNKOWN + o.getClass().getName()); - } -- - private BatchWriter getBatchWriter() throws IOException { - if (batchWriter == null) - try { - BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); - batchWriterConfig.setMaxMemory(10000000); - batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS); - batchWriterConfig.setMaxWriteThreads(4); - batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig); - } catch (TableNotFoundException e) { - throw new IOException(e); - } - return batchWriter; - } - private BatchWriter getBatchWriter() throws IOException { - if (batchWriter == null) - try { - BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); - batchWriterConfig.setMaxMemory(10000000); - batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS); - batchWriterConfig.setMaxWriteThreads(4); - batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig); - } catch (TableNotFoundException e) { - throw new IOException(e); - } - return batchWriter; - } -- - /** - * Initialize the data store by reading the credentials, setting the client's properties up and - * reading the mapping file. Initialize is called when then the call to - * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made. - * - * @param keyClass - * @param persistentClass - * @param properties - */ - @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { - try{ - super.initialize(keyClass, persistentClass, properties); - /** - * Initialize the data store by reading the credentials, setting the - * client's properties up and reading the mapping file. Initialize is called - * when then the call to - * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made. - * - * @param keyClass - * @param persistentClass - * @param properties - */ - @Override - public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { - try { - super.initialize(keyClass, persistentClass, properties); -- - String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null); - String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); - String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null); - String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null); - String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null); - String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); - String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null); - String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null); -- - mapping = readMapping(mappingFile); - mapping = readMapping(mappingFile); -- - if (mapping.encoder == null || "".equals(mapping.encoder)) { - encoder = new BinaryEncoder(); - } else { - try { - encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - throw new IOException(e); - } - } - if (mapping.encoder == null || "".equals(mapping.encoder)) { - encoder = new BinaryEncoder(); - } else { - try { - encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - throw new IOException(e); - } - } -- - try { - AuthenticationToken token = new PasswordToken(password); - if (mock == null || !mock.equals("true")) { - String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null); - String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null); - conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token); - } else { - conn = new MockInstance().getConnector(user, token); - } - credentials = new Credentials(user, token); - try { - AuthenticationToken token = new PasswordToken(password); - if (mock == null || !mock.equals("true")) { - String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null); - String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null); - conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, token); - } else { - conn = new MockInstance().getConnector(user, token); - } - credentials = new Credentials(user, token); -- - if (autoCreateSchema && !schemaExists()) - createSchema(); - } catch (AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); - } - } catch(IOException e){ - LOG.error(e.getMessage(), e); - } - } - if (autoCreateSchema && !schemaExists()) - createSchema(); - } catch (AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } -- - protected AccumuloMapping readMapping(String filename) throws IOException { - try { - protected AccumuloMapping readMapping(String filename) throws IOException { - try { -- - AccumuloMapping mapping = new AccumuloMapping(); - AccumuloMapping mapping = new AccumuloMapping(); -- - DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder(); - Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename)); - DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder(); - Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename)); -- - Element root = dom.getDocumentElement(); - Element root = dom.getDocumentElement(); -- - NodeList nl = root.getElementsByTagName("class"); - for (int i = 0; i < nl.getLength(); i++) { - NodeList nl = root.getElementsByTagName("class"); - for (int i = 0; i < nl.getLength(); i++) { -- - Element classElement = (Element) nl.item(i); - if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName()) - && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) { - Element classElement = (Element) nl.item(i); - if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName()) - && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) { -- - mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass); - mapping.encoder = classElement.getAttribute("encoder"); - mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass); - mapping.encoder = classElement.getAttribute("encoder"); -- - NodeList fields = classElement.getElementsByTagName("field"); - for (int j = 0; j < fields.getLength(); j++) { - Element fieldElement = (Element) fields.item(j); - NodeList fields = classElement.getElementsByTagName("field"); - for (int j = 0; j < fields.getLength(); j++) { - Element fieldElement = (Element) fields.item(j); -- - String name = fieldElement.getAttribute("name"); - String family = fieldElement.getAttribute("family"); - String qualifier = fieldElement.getAttribute("qualifier"); - if ("".equals(qualifier)) - qualifier = null; - String name = fieldElement.getAttribute("name"); - String family = fieldElement.getAttribute("family"); - String qualifier = fieldElement.getAttribute("qualifier"); - if ("".equals(qualifier)) - qualifier = null; -- - Pair<Text,Text> col = new Pair<>(new Text(family), qualifier == null ? null : new Text(qualifier)); - mapping.fieldMap.put(name, col); - mapping.columnMap.put(col, name); - } - } - Pair<Text, Text> col = new Pair<>(new Text(family), - qualifier == null ? null : new Text(qualifier)); - mapping.fieldMap.put(name, col); - mapping.columnMap.put(col, name); - } - } -- - } - } -- - if (mapping.tableName == null) { - throw new GoraException("Please define the accumulo 'table' name mapping in " + filename + " for " + persistentClass.getCanonicalName()); - } - if (mapping.tableName == null) { - throw new GoraException("Please define the accumulo 'table' name mapping in " + filename + " for " - + persistentClass.getCanonicalName()); - } -- - nl = root.getElementsByTagName("table"); - for (int i = 0; i < nl.getLength(); i++) { - Element tableElement = (Element) nl.item(i); - if (tableElement.getAttribute("name").equals(mapping.tableName)) { - NodeList configs = tableElement.getElementsByTagName("config"); - for (int j = 0; j < configs.getLength(); j++) { - Element configElement = (Element) configs.item(j); - String key = configElement.getAttribute("key"); - String val = configElement.getAttribute("value"); - mapping.tableConfig.put(key, val); - } - } - } - nl = root.getElementsByTagName("table"); - for (int i = 0; i < nl.getLength(); i++) { - Element tableElement = (Element) nl.item(i); - if (tableElement.getAttribute("name").equals(mapping.tableName)) { - NodeList configs = tableElement.getElementsByTagName("config"); - for (int j = 0; j < configs.getLength(); j++) { - Element configElement = (Element) configs.item(j); - String key = configElement.getAttribute("key"); - String val = configElement.getAttribute("value"); - mapping.tableConfig.put(key, val); - } - } - } -- - return mapping; - } catch (Exception ex) { - throw new IOException("Unable to read " + filename, ex); - } - return mapping; - } catch (Exception ex) { - throw new IOException("Unable to read " + filename, ex); - } -- - } - } -- - @Override - public String getSchemaName() { - return mapping.tableName; - } - @Override - public String getSchemaName() { - return mapping.tableName; - } -- - @Override - public void createSchema() { - try { - conn.tableOperations().create(mapping.tableName); - Set<Entry<String,String>> es = mapping.tableConfig.entrySet(); - for (Entry<String,String> entry : es) { - conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue()); - } - @Override - public void createSchema() { - try { - conn.tableOperations().create(mapping.tableName); - Set<Entry<String, String>> es = mapping.tableConfig.entrySet(); - for (Entry<String, String> entry : es) { - conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue()); - } -- - } catch (AccumuloException | AccumuloSecurityException e) { - LOG.error(e.getMessage(), e); - } catch (TableExistsException e) { - LOG.debug(e.getMessage(), e); - } - } - } catch (AccumuloException | AccumuloSecurityException e) { - LOG.error(e.getMessage(), e); - } catch (TableExistsException e) { - LOG.debug(e.getMessage(), e); - } - } -- - @Override - public void deleteSchema() { - try { - if (batchWriter != null) - batchWriter.close(); - batchWriter = null; - conn.tableOperations().delete(mapping.tableName); - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { - LOG.error(e.getMessage(), e); - } - } - @Override - public void deleteSchema() { - try { - if (batchWriter != null) - batchWriter.close(); - batchWriter = null; - conn.tableOperations().delete(mapping.tableName); - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { - LOG.error(e.getMessage(), e); - } - } -- - @Override - public boolean schemaExists() { - return conn.tableOperations().exists(mapping.tableName); - } - @Override - public boolean schemaExists() { - return conn.tableOperations().exists(mapping.tableName); - } -- - public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException { - ByteSequence row = null; - public ByteSequence populate(Iterator<Entry<Key, Value>> iter, T persistent) throws IOException { - ByteSequence row = null; -- - Map<Utf8, Object> currentMap = null; - List currentArray = null; - Text currentFam = null; - int currentPos = 0; - Schema currentSchema = null; - Field currentField = null; - Map<Utf8, Object> currentMap = null; - List currentArray = null; - Text currentFam = null; - int currentPos = 0; - Schema currentSchema = null; - Field currentField = null; -- - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null); - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null); -- - while (iter.hasNext()) { - Entry<Key,Value> entry = iter.next(); - while (iter.hasNext()) { - Entry<Key, Value> entry = iter.next(); -- - if (row == null) { - row = entry.getKey().getRowData(); - } - byte[] val = entry.getValue().get(); - if (row == null) { - row = entry.getKey().getRowData(); - } - byte[] val = entry.getValue().get(); -- - Field field = fieldMap.get(getFieldName(entry)); - Field field = fieldMap.get(getFieldName(entry)); -- - if (currentMap != null) { - if (currentFam.equals(entry.getKey().getColumnFamily())) { - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - continue; - } else { - persistent.put(currentPos, currentMap); - currentMap = null; - } - } else if (currentArray != null) { - if (currentFam.equals(entry.getKey().getColumnFamily())) { - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); - continue; - } else { - persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); - currentArray = null; - } - } - if (currentMap != null) { - if (currentFam.equals(entry.getKey().getColumnFamily())) { - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - continue; - } else { - persistent.put(currentPos, currentMap); - currentMap = null; - } - } else if (currentArray != null) { - if (currentFam.equals(entry.getKey().getColumnFamily())) { - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); - continue; - } else { - persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); - currentArray = null; - } - } -- - switch (field.schema().getType()) { - case MAP: // first entry only. Next are handled above on the next loop - currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getValueType(); - switch (field.schema().getType()) { - case MAP: // first entry only. Next are handled above on the next - // loop - currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getValueType(); -- - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - break; - case ARRAY: - currentArray = new DirtyListWrapper<>(new ArrayList<>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getElementType(); - currentField = field; - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - break; - case ARRAY: - currentArray = new DirtyListWrapper<>(new ArrayList<>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getElementType(); - currentField = field; -- - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); -- - break; - case UNION:// default value of null acts like union with null - Schema effectiveSchema = field.schema().getTypes() - .get(firstNotNullSchemaTypeIndex(field.schema())); - // map and array were coded without union index so need to be read the same way - if (effectiveSchema.getType() == Type.ARRAY) { - currentArray = new DirtyListWrapper<>(new ArrayList<>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getElementType(); - currentField = field; - break; - case UNION:// default value of null acts like union with null - Schema effectiveSchema = field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema())); - // map and array were coded without union index so need to be - // read the same way - if (effectiveSchema.getType() == Type.ARRAY) { - currentArray = new DirtyListWrapper<>(new ArrayList<>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = field.schema().getElementType(); - currentField = field; -- - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); - break; - } - else if (effectiveSchema.getType() == Type.MAP) { - currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = effectiveSchema.getValueType(); - currentArray.add(fromBytes(currentSchema, entry.getValue().get())); - break; - } else if (effectiveSchema.getType() == Type.MAP) { - currentMap = new DirtyMapWrapper<>(new HashMap<Utf8, Object>()); - currentPos = field.pos(); - currentFam = entry.getKey().getColumnFamily(); - currentSchema = effectiveSchema.getValueType(); -- - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - break; - } - // continue like a regular top-level union - case RECORD: - SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema()); - persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val, decoder))); - break; - default: - persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get())); - } - } - currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), - fromBytes(currentSchema, entry.getValue().get())); - break; - } - // continue like a regular top-level union - case RECORD: - SpecificDatumReader<?> reader = new SpecificDatumReader<Schema>(field.schema()); - persistent.put(field.pos(), reader.read(null, DecoderFactory.get().binaryDecoder(val, decoder))); - break; - default: - persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get())); - } - } -- - if (currentMap != null) { - persistent.put(currentPos, currentMap); - } else if (currentArray != null) { - persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); - } - if (currentMap != null) { - persistent.put(currentPos, currentMap); - } else if (currentArray != null) { - persistent.put(currentPos, new GenericData.Array<T>(currentField.schema(), currentArray)); - } -- - persistent.clearDirty(); - persistent.clearDirty(); -- - return row; - } - return row; - } -- - /** - * Retrieve field name from entry. - * @param entry The Key-Value entry - * @return String The field name - */ - private String getFieldName(Entry<Key, Value> entry) { - String fieldName = mapping.columnMap.get(new Pair<>(entry.getKey().getColumnFamily(), - entry.getKey().getColumnQualifier())); - if (fieldName == null) { - fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), null)); - } - return fieldName; - } - /** - * Retrieve field name from entry. - * - * @param entry - * The Key-Value entry - * @return String The field name - */ - private String getFieldName(Entry<Key, Value> entry) { - String fieldName = mapping.columnMap - .get(new Pair<>(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier())); - if (fieldName == null) { - fieldName = mapping.columnMap.get(new Pair<Text, Text>(entry.getKey().getColumnFamily(), null)); - } - return fieldName; - } -- - private void setFetchColumns(Scanner scanner, String[] fields) { - fields = getFieldsToQuery(fields); - for (String field : fields) { - Pair<Text,Text> col = mapping.fieldMap.get(field); - if (col != null) { - if (col.getSecond() == null) { - scanner.fetchColumnFamily(col.getFirst()); - } else { - scanner.fetchColumn(col.getFirst(), col.getSecond()); - } - } else { - LOG.error("Mapping not found for field: {}", field); - } - } - } - private void setFetchColumns(Scanner scanner, String[] fields) { - fields = getFieldsToQuery(fields); - for (String field : fields) { - Pair<Text, Text> col = mapping.fieldMap.get(field); - if (col != null) { - if (col.getSecond() == null) { - scanner.fetchColumnFamily(col.getFirst()); - } else { - scanner.fetchColumn(col.getFirst(), col.getSecond()); - } - } else { - LOG.error("Mapping not found for field: {}", field); - } - } - } -- - @Override - public T get(K key, String[] fields) { - try { - // TODO make isolated scanner optional? - Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); - Range rowRange = new Range(new Text(toBytes(key))); - @Override - public T get(K key, String[] fields) { - try { - // TODO make isolated scanner optional? - Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); - Range rowRange = new Range(new Text(toBytes(key))); -- - scanner.setRange(rowRange); - setFetchColumns(scanner, fields); - scanner.setRange(rowRange); - setFetchColumns(scanner, fields); -- - T persistent = newPersistent(); - ByteSequence row = populate(scanner.iterator(), persistent); - if (row == null) - return null; - return persistent; - } catch (TableNotFoundException e) { - LOG.error(e.getMessage(), e); - return null; - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return null; - } - } - T persistent = newPersistent(); - ByteSequence row = populate(scanner.iterator(), persistent); - if (row == null) - return null; - return persistent; - } catch (TableNotFoundException e) { - LOG.error(e.getMessage(), e); - return null; - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return null; - } - } -- - @Override - public void put(K key, T val) { - @Override - public void put(K key, T val) { -- - try{ - Mutation m = new Mutation(new Text(toBytes(key))); - try { - Mutation m = new Mutation(new Text(toBytes(key))); -- - Schema schema = val.getSchema(); - List<Field> fields = schema.getFields(); - int count = 0; - Schema schema = val.getSchema(); - List<Field> fields = schema.getFields(); - int count = 0; -- - for (int i = 0; i < fields.size(); i++) { - if (!val.isDirty(i)) { - continue; - } - Field field = fields.get(i); - for (int i = 0; i < fields.size(); i++) { - if (!val.isDirty(i)) { - continue; - } - Field field = fields.get(i); -- - Object o = val.get(field.pos()); - Object o = val.get(field.pos()); -- - Pair<Text,Text> col = mapping.fieldMap.get(field.name()); - Pair<Text, Text> col = mapping.fieldMap.get(field.name()); -- - if (col == null) { - throw new GoraException("Please define the gora to accumulo mapping for field " + field.name()); - } - if (col == null) { - throw new GoraException("Please define the gora to accumulo mapping for field " + field.name()); - } -- - switch (field.schema().getType()) { - case MAP: - count = putMap(m, count, field.schema().getValueType(), o, col, field.name()); - break; - case ARRAY: - count = putArray(m, count, o, col, field.name()); - break; - case UNION: // default value of null acts like union with null - Schema effectiveSchema = field.schema().getTypes() - .get(firstNotNullSchemaTypeIndex(field.schema())); - // map and array need to compute qualifier - if (effectiveSchema.getType() == Type.ARRAY) { - count = putArray(m, count, o, col, field.name()); - break; - } - else if (effectiveSchema.getType() == Type.MAP) { - count = putMap(m, count, effectiveSchema.getValueType(), o, col, field.name()); - break; - } - // continue like a regular top-level union - case RECORD: - final SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(field.schema()); - final byte[] byteData = IOUtils.serialize(writer,o); - m.put(col.getFirst(), col.getSecond(), new Value(byteData)); - count++; - break; - default: - m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o))); - count++; - } - switch (field.schema().getType()) { - case MAP: - count = putMap(m, count, field.schema().getValueType(), o, col, field.name()); - break; - case ARRAY: - count = putArray(m, count, o, col, field.name()); - break; - case UNION: // default value of null acts like union with null - Schema effectiveSchema = field.schema().getTypes().get(firstNotNullSchemaTypeIndex(field.schema())); - // map and array need to compute qualifier - if (effectiveSchema.getType() == Type.ARRAY) { - count = putArray(m, count, o, col, field.name()); - break; - } else if (effectiveSchema.getType() == Type.MAP) { - count = putMap(m, count, effectiveSchema.getValueType(), o, col, field.name()); - break; - } - // continue like a regular top-level union - case RECORD: - final SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(field.schema()); - final byte[] byteData = IOUtils.serialize(writer, o); - m.put(col.getFirst(), col.getSecond(), new Value(byteData)); - count++; - break; - default: - m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o))); - count++; - } -- - } - } -- - if (count > 0) - try { - getBatchWriter().addMutation(m); - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - if (count > 0) - try { - getBatchWriter().addMutation(m); - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } -- - private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text> col, String fieldName) throws GoraException { - private int putMap(Mutation m, int count, Schema valueType, Object o, Pair<Text, Text> col, String fieldName) - throws GoraException { -- - // First of all we delete map field on accumulo store - Text rowKey = new Text(m.getRow()); - Query<K, T> query = newQuery(); - query.setFields(fieldName); - query.setStartKey((K)rowKey.toString()); - query.setEndKey((K)rowKey.toString()); - deleteByQuery(query); - flush(); - if (o == null){ - return 0; - } - // First of all we delete map field on accumulo store - Text rowKey = new Text(m.getRow()); - Query<K, T> query = newQuery(); - query.setFields(fieldName); - query.setStartKey((K) rowKey.toString()); - query.setEndKey((K) rowKey.toString()); - deleteByQuery(query); - flush(); - if (o == null) { - return 0; - } -- - Set<?> es = ((Map<?, ?>)o).entrySet(); - for (Object entry : es) { - Object mapKey = ((Entry<?, ?>) entry).getKey(); - Object mapVal = ((Entry<?, ?>) entry).getValue(); - if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>)o).isDirty()) - || !(o instanceof DirtyMapWrapper)) { - m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal))); - count++; - } - // TODO map value deletion - } - return count; - } - Set<?> es = ((Map<?, ?>) o).entrySet(); - for (Object entry : es) { - Object mapKey = ((Entry<?, ?>) entry).getKey(); - Object mapVal = ((Entry<?, ?>) entry).getValue(); - if ((o instanceof DirtyMapWrapper && ((DirtyMapWrapper<?, ?>) o).isDirty()) - || !(o instanceof DirtyMapWrapper)) { - m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(valueType, mapVal))); - count++; - } - // TODO map value deletion - } - return count; - } -- - private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) { - private int putArray(Mutation m, int count, Object o, Pair<Text, Text> col, String fieldName) { -- - // First of all we delete array field on accumulo store - Text rowKey = new Text(m.getRow()); - Query<K, T> query = newQuery(); - query.setFields(fieldName); - query.setStartKey((K)rowKey.toString()); - query.setEndKey((K)rowKey.toString()); - deleteByQuery(query); - flush(); - if (o == null){ - return 0; - } - // First of all we delete array field on accumulo store - Text rowKey = new Text(m.getRow()); - Query<K, T> query = newQuery(); - query.setFields(fieldName); - query.setStartKey((K) rowKey.toString()); - query.setEndKey((K) rowKey.toString()); - deleteByQuery(query); - flush(); - if (o == null) { - return 0; - } -- - List<?> array = (List<?>) o; // both GenericArray and DirtyListWrapper - int j = 0; - for (Object item : array) { - m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item))); - count++; - } - return count; - } - List<?> array = (List<?>) o; // both GenericArray and DirtyListWrapper - int j = 0; - for (Object item : array) { - m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item))); - count++; - } - return count; - } -- - @Override - public boolean delete(K key) { - Query<K,T> q = newQuery(); - q.setKey(key); - return deleteByQuery(q) > 0; - } - @Override - public boolean delete(K key) { - Query<K, T> q = newQuery(); - q.setKey(key); - return deleteByQuery(q) > 0; - } -- - @Override - public long deleteByQuery(Query<K,T> query) { - try { - Scanner scanner = createScanner(query); - // add iterator that drops values on the server side - scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class)); - RowIterator iterator = new RowIterator(scanner.iterator()); - @Override - public long deleteByQuery(Query<K, T> query) { - try { - Scanner scanner = createScanner(query); - // add iterator that drops values on the server side - scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class)); - RowIterator iterator = new RowIterator(scanner.iterator()); -- - long count = 0; - long count = 0; -- - while (iterator.hasNext()) { - Iterator<Entry<Key,Value>> row = iterator.next(); - Mutation m = null; - while (row.hasNext()) { - Entry<Key,Value> entry = row.next(); - Key key = entry.getKey(); - if (m == null) - m = new Mutation(key.getRow()); - // TODO optimize to avoid continually creating column vis? prob does not matter for empty - m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp()); - } - getBatchWriter().addMutation(m); - count++; - } - while (iterator.hasNext()) { - Iterator<Entry<Key, Value>> row = iterator.next(); - Mutation m = null; - while (row.hasNext()) { - Entry<Key, Value> entry = row.next(); - Key key = entry.getKey(); - if (m == null) - m = new Mutation(key.getRow()); - // TODO optimize to avoid continually creating column vis? - // prob does not matter for empty - m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), - new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp()); - } - getBatchWriter().addMutation(m); - count++; - } -- - return count; - } catch (TableNotFoundException e) { - // TODO return 0? - LOG.error(e.getMessage(), e); - return 0; - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - return 0; - } catch (IOException e){ - LOG.error(e.getMessage(), e); - return 0; - } - } - return count; - } catch (TableNotFoundException e) { - // TODO return 0? - LOG.error(e.getMessage(), e); - return 0; - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - return 0; - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return 0; - } - } -- - private Range createRange(Query<K,T> query) { - Text startRow = null; - Text endRow = null; - private Range createRange(Query<K, T> query) { - Text startRow = null; - Text endRow = null; -- - if (query.getStartKey() != null) - startRow = new Text(toBytes(query.getStartKey())); - if (query.getStartKey() != null) - startRow = new Text(toBytes(query.getStartKey())); -- - if (query.getEndKey() != null) - endRow = new Text(toBytes(query.getEndKey())); - if (query.getEndKey() != null) - endRow = new Text(toBytes(query.getEndKey())); -- - return new Range(startRow, true, endRow, true); - return new Range(startRow, true, endRow, true); -- - } - } -- - private Scanner createScanner(Query<K,T> query) throws TableNotFoundException { - // TODO make isolated scanner optional? - Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); - setFetchColumns(scanner, query.getFields()); - private Scanner createScanner(Query<K, T> query) throws TableNotFoundException { - // TODO make isolated scanner optional? - Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Authorizations.EMPTY)); - setFetchColumns(scanner, query.getFields()); -- - scanner.setRange(createRange(query)); - scanner.setRange(createRange(query)); -- - if (query.getStartTime() != -1 || query.getEndTime() != -1) { - IteratorSetting is = new IteratorSetting(30, TimestampFilter.class); - if (query.getStartTime() != -1) - TimestampFilter.setStart(is, query.getStartTime(), true); - if (query.getEndTime() != -1) - TimestampFilter.setEnd(is, query.getEndTime(), true); - if (query.getStartTime() != -1 || query.getEndTime() != -1) { - IteratorSetting is = new IteratorSetting(30, TimestampFilter.class); - if (query.getStartTime() != -1) - TimestampFilter.setStart(is, query.getStartTime(), true); - if (query.getEndTime() != -1) - TimestampFilter.setEnd(is, query.getEndTime(), true); -- - scanner.addScanIterator(is); - } - scanner.addScanIterator(is); - } -- - return scanner; - } - return scanner; - } -- - /** - * Execute the query and return the result. - */ - @Override - public Result<K,T> execute(Query<K,T> query) { - try { - Scanner scanner = createScanner(query); - return new AccumuloResult<>(this, query, scanner); - } catch (TableNotFoundException e) { - // TODO return empty result? - LOG.error(e.getMessage(), e); - return null; - } - } - /** - * Execute the query and return the result. - */ - @Override - public Result<K, T> execute(Query<K, T> query) { - try { - Scanner scanner = createScanner(query); - return new AccumuloResult<>(this, query, scanner); - } catch (TableNotFoundException e) { - // TODO return empty result? - LOG.error(e.getMessage(), e); - return null; - } - } -- - @Override - public Query<K,T> newQuery() { - return new AccumuloQuery<>(this); - } - @Override - public Query<K, T> newQuery() { - return new AccumuloQuery<>(this); - } -- - Text pad(Text key, int bytes) { - if (key.getLength() < bytes) - key = new Text(key); - Text pad(Text key, int bytes) { - if (key.getLength() < bytes) - key = new Text(key); -- - while (key.getLength() < bytes) { - key.append(new byte[] {0}, 0, 1); - } - while (key.getLength() < bytes) { - key.append(new byte[] { 0 }, 0, 1); - } -- - return key; - } - return key; - } -- - @Override - public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException { - try { - TabletLocator tl; - if (conn instanceof MockConnector) - tl = new MockTabletLocator(); - else - tl = TabletLocator.getLocator(new ClientContext(conn.getInstance(), credentials, AccumuloConfiguration.getTableConfiguration(conn, Tables.getTableId(conn.getInstance(), mapping.tableName))), new Text(Tables.getTableId(conn.getInstance(), mapping.tableName))); - @Override - public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { - try { - TabletLocator tl; - if (conn instanceof MockConnector) - tl = new MockTabletLocator(); - else - tl = TabletLocator.getLocator( - new ClientContext(conn.getInstance(), credentials, - AccumuloConfiguration.getTableConfiguration(conn, - Tables.getTableId(conn.getInstance(), mapping.tableName))), - new Text(Tables.getTableId(conn.getInstance(), mapping.tableName))); -- - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); - Map<String, Map<KeyExtent, List<Range>>> binnedRanges = new HashMap<>(); -- - tl.invalidateCache(); - while (tl.binRanges(new ClientContext(conn.getInstance(), credentials, AccumuloConfiguration.getTableConfiguration(conn, Tables.getTableId(conn.getInstance(), mapping.tableName))), Collections.singletonList(createRange(query)), binnedRanges).size() > 0) { - // TODO log? - if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName))) - throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName)); - else if (Tables.getTableState(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE) - throw new TableOfflineException(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)); - UtilWaitThread.sleep(100); - tl.invalidateCache(); - } - tl.invalidateCache(); - while (tl.binRanges( - new ClientContext(conn.getInstance(), credentials, - AccumuloConfiguration.getTableConfiguration(conn, - Tables.getTableId(conn.getInstance(), mapping.tableName))), - Collections.singletonList(createRange(query)), binnedRanges).size() > 0) { - // TODO log? - if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName))) - throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName)); - else if (Tables.getTableState(conn.getInstance(), - Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE) - throw new TableOfflineException(conn.getInstance(), - Tables.getTableId(conn.getInstance(), mapping.tableName)); - UtilWaitThread.sleep(100); - tl.invalidateCache(); - } -- - List<PartitionQuery<K,T>> ret = new ArrayList<>(); - List<PartitionQuery<K, T>> ret = new ArrayList<>(); -- - Text startRow = null; - Text endRow = null; - if (query.getStartKey() != null) - startRow = new Text(toBytes(query.getStartKey())); - if (query.getEndKey() != null) - endRow = new Text(toBytes(query.getEndKey())); - Text startRow = null; - Text endRow = null; - if (query.getStartKey() != null) - startRow = new Text(toBytes(query.getStartKey())); - if (query.getEndKey() != null) - endRow = new Text(toBytes(query.getEndKey())); -- - //hadoop expects hostnames, accumulo keeps track of IPs... so need to convert - HashMap<String,String> hostNameCache = new HashMap<>(); - // hadoop expects hostnames, accumulo keeps track of IPs... so need - // to convert - HashMap<String, String> hostNameCache = new HashMap<>(); -- - for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) { - String ip = entry.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } - for (Entry<String, Map<KeyExtent, List<Range>>> entry : binnedRanges.entrySet()) { - String ip = entry.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } -- - Map<KeyExtent,List<Range>> tablets = entry.getValue(); - for (KeyExtent ke : tablets.keySet()) { - Map<KeyExtent, List<Range>> tablets = entry.getValue(); - for (KeyExtent ke : tablets.keySet()) { -- - K startKey = null; - if (startRow == null || !ke.contains(startRow)) { - if (ke.getPrevEndRow() != null) { - startKey = followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow())); - } - } else { - startKey = fromBytes(getKeyClass(), getBytes(startRow)); - } - K startKey = null; - if (startRow == null || !ke.contains(startRow)) { - if (ke.getPrevEndRow() != null) { - startKey = followingKey(encoder, getKeyClass(), getBytes(ke.getPrevEndRow())); - } - } else { - startKey = fromBytes(getKeyClass(), getBytes(startRow)); - } -- - K endKey = null; - if (endRow == null || !ke.contains(endRow)) { - if (ke.getEndRow() != null) - endKey = lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow())); - } else { - endKey = fromBytes(getKeyClass(), getBytes(endRow)); - } - K endKey = null; - if (endRow == null || !ke.contains(endRow)) { - if (ke.getEndRow() != null) - endKey = lastPossibleKey(encoder, getKeyClass(), getBytes(ke.getEndRow())); - } else { - endKey = fromBytes(getKeyClass(), getBytes(endRow)); - } -- - PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, startKey, endKey, location); - pqi.setConf(getConf()); - ret.add(pqi); - } - } - PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query, startKey, endKey, location); - pqi.setConf(getConf()); - ret.add(pqi); - } - } -- - return ret; - } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); - } - return ret; - } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { - throw new IOException(e); - } -- - } - } -- - static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) { - static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) { -- - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); - } else if (clazz.equals(String.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Utf8.class)) { - return fromBytes(encoder, clazz, er); - } - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er)); - } else if (clazz.equals(String.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Utf8.class)) { - return fromBytes(encoder, clazz, er); - } -- - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } -- - @SuppressWarnings("unchecked") - static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) { - @SuppressWarnings("unchecked") - static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) { -- - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - return (K) Byte.valueOf(encoder.followingKey(1, per)[0]); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(2, per)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(4, per)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(8, per)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(4, per)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(8, per)); - } else if (clazz.equals(String.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Utf8.class)) { - return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1)); - } - if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { - return (K) Byte.valueOf(encoder.followingKey(1, per)[0]); - } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(2, per)); - } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(4, per)); - } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(8, per)); - } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(4, per)); - } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { - return fromBytes(encoder, clazz, encoder.followingKey(8, per)); - } else if (clazz.equals(String.class)) { - throw new UnsupportedOperationException(); - } else if (clazz.equals(Utf8.class)) { - return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1)); - } -- - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } - throw new IllegalArgumentException(UNKOWN + clazz.getName()); - } -- - @Override - public void flush() { - try { - if (batchWriter != null) { - batchWriter.flush(); - } - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } - @Override - public void flush() { - try { - if (batchWriter != null) { - batchWriter.flush(); - } - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } -- - @Override - public void close() { - try { - if (batchWriter != null) { - batchWriter.close(); - batchWriter = null; - } - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } - @Override - public void close() { - try { - if (batchWriter != null) { - batchWriter.close(); - batchWriter = null; - } - } catch (MutationsRejectedException e) { - LOG.error(e.getMessage(), e); - } - } --} ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.gora.accumulo.store; ++ ++import java.io.ByteArrayOutputStream; ++import java.io.IOException; ++import java.net.InetAddress; ++import java.nio.ByteBuffer; ++import java.util.ArrayList; ++import java.util.Arrays; ++import java.util.Collections; ++import java.util.HashMap; ++import java.util.Iterator; ++import java.util.List; ++import java.util.Map; ++import java.util.Map.Entry; ++import java.util.Properties; ++import java.util.Set; ++import java.util.concurrent.TimeUnit; ++ ++import javax.xml.parsers.DocumentBuilder; ++import javax.xml.parsers.DocumentBuilderFactory; ++ ++import org.apache.accumulo.core.client.AccumuloException; ++import org.apache.accumulo.core.client.AccumuloSecurityException; ++import org.apache.accumulo.core.client.BatchWriter; ++import org.apache.accumulo.core.client.BatchWriterConfig; ++import org.apache.accumulo.core.client.Connector; ++import org.apache.accumulo.core.client.IsolatedScanner; ++import org.apache.accumulo.core.client.IteratorSetting; ++import org.apache.accumulo.core.client.MutationsRejectedException; ++import org.apache.accumulo.core.client.RowIterator; ++import org.apache.accumulo.core.client.Scanner; ++import org.apache.accumulo.core.client.TableDeletedException; ++import org.apache.accumulo.core.client.TableExistsException; ++import org.apache.accumulo.core.client.TableNotFoundException; ++import org.apache.accumulo.core.client.TableOfflineException; ++import org.apache.accumulo.core.client.ZooKeeperInstance; ++import org.apache.accumulo.core.client.impl.ClientContext; ++import org.apache.accumulo.core.client.impl.Tables; ++import org.apache.accumulo.core.client.impl.TabletLocator; ++import org.apache.accumulo.core.client.mock.MockConnector; ++import org.apache.accumulo.core.client.mock.MockInstance; ++import org.apache.accumulo.core.client.mock.impl.MockTabletLocator; ++import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; ++import org.apache.accumulo.core.client.security.tokens.PasswordToken; ++import org.apache.accumulo.core.conf.AccumuloConfiguration; ++import org.apache.accumulo.core.data.ByteSequence; ++import org.apache.accumulo.core.data.Key; ++import org.apache.accumulo.core.data.impl.KeyExtent; ++import org.apache.accumulo.core.data.Mutation; ++import org.apache.accumulo.core.data.Range; ++import org.apache.accumulo.core.data.Value; ++import org.apache.accumulo.core.iterators.SortedKeyIterator; ++import org.apache.accumulo.core.iterators.user.TimestampFilter; ++import org.apache.accumulo.core.master.state.tables.TableState; ++import org.apache.accumulo.core.security.Authorizations; ++import org.apache.accumulo.core.security.ColumnVisibility; ++import org.apache.accumulo.core.client.impl.Credentials; ++import org.apache.accumulo.core.util.Pair; ++import org.apache.accumulo.core.util.UtilWaitThread; ++import org.apache.avro.Schema; ++import org.apache.avro.Schema.Field; ++import org.apache.avro.Schema.Type; ++import org.apache.avro.generic.GenericData; ++import org.apache.avro.io.BinaryDecoder; ++import org.apache.avro.io.Decoder; ++import org.apache.avro.io.DecoderFactory; ++import org.apache.avro.io.EncoderFactory; ++import org.apache.avro.specific.SpecificDatumReader; ++import org.apache.avro.specific.SpecificDatumWriter; ++import org.apache.avro.util.Utf8; ++import org.apache.gora.accumulo.encoders.BinaryEncoder; ++import org.apache.gora.accumulo.encoders.Encoder; ++import org.apache.gora.accumulo.query.AccumuloQuery; ++import org.apache.gora.accumulo.query.AccumuloResult; ++import org.apache.gora.persistency.impl.DirtyListWrapper; ++import org.apache.gora.persistency.impl.DirtyMapWrapper; ++import org.apache.gora.persistency.impl.PersistentBase; ++import org.apache.gora.query.PartitionQuery; ++import org.apache.gora.query.Query; ++import org.apache.gora.query.Result; ++import org.apache.gora.query.impl.PartitionQueryImpl; ++import org.apache.gora.store.DataStoreFactory; ++import org.apache.gora.store.impl.DataStoreBase; ++import org.apache.gora.util.AvroUtils; ++import org.apache.gora.util.GoraException; ++import org.apache.gora.util.IOUtils; ++import org.apache.hadoop.io.Text; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++import org.w3c.dom.Document; ++import org.w3c.dom.Element; ++import org.w3c.dom.NodeList; ++ ++/** ++ * Implementation of a Accumulo data store to be used by gora. ++ * ++ * @param <K> ++ * class to be used for the key ++ * @param <T> ++ * class to be persisted within the store ++ */ ++public class AccumuloStore<K,T extends PersistentBase> extends DataStoreBase<K,T> { ++ ++ protected static final String MOCK_PROPERTY = "accumulo.mock"; ++ protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance"; ++ protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers"; ++ protected static final String USERNAME_PROPERTY = "accumulo.user"; ++ protected static final String PASSWORD_PROPERTY = "accumulo.password"; ++ protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml"; ++ ++ private final static String UNKOWN = "Unknown type "; ++ ++ private Connector conn; ++ private BatchWriter batchWriter; ++ private AccumuloMapping mapping; ++ private Credentials credentials; ++ private Encoder encoder; ++ ++ public static final Logger LOG = LoggerFactory.getLogger(AccumuloStore.class); ++ ++ public Object fromBytes(Schema schema, byte[] data) throws IOException { ++ Schema fromSchema = null; ++ if (schema.getType() == Type.UNION) { ++ try { ++ Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); ++ int unionIndex = decoder.readIndex(); ++ List<Schema> possibleTypes = schema.getTypes(); ++ fromSchema = possibleTypes.get(unionIndex); ++ Schema effectiveSchema = possibleTypes.get(unionIndex); ++ if (effectiveSchema.getType() == Type.NULL) { ++ decoder.readNull(); ++ return null; ++ } else { ++ data = decoder.readBytes(null).array(); ++ } ++ } catch (IOException e) { ++ LOG.error(e.getMessage()); ++ throw new GoraException("Error decoding union type: ", e); ++ } ++ } else { ++ fromSchema = schema; ++ } ++ return fromBytes(encoder, fromSchema, data); ++ } ++ ++ public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) throws IOException { ++ switch (schema.getType()) { ++ case BOOLEAN: ++ return encoder.decodeBoolean(data); ++ case DOUBLE: ++ return encoder.decodeDouble(data); ++ case FLOAT: ++ return encoder.decodeFloat(data); ++ case INT: ++ return encoder.decodeInt(data); ++ case LONG: ++ return encoder.decodeLong(data); ++ case STRING: ++ return new Utf8(data); ++ case BYTES: ++ return ByteBuffer.wrap(data); ++ case ENUM: ++ return AvroUtils.getEnumValue(schema, encoder.decodeInt(data)); ++ case ARRAY: ++ break; ++ case FIXED: ++ break; ++ case MAP: ++ break; ++ case NULL: ++ break; ++ case RECORD: ++ break; ++ case UNION: ++ break; ++ default: ++ break; ++ } ++ throw new IllegalArgumentException(UNKOWN + schema.getType()); ++ ++ } ++ ++ private static byte[] getBytes(Text text) { ++ byte[] bytes = text.getBytes(); ++ if (bytes.length != text.getLength()) { ++ bytes = new byte[text.getLength()]; ++ System.arraycopy(text.getBytes(), 0, bytes, 0, bytes.length); ++ } ++ return bytes; ++ } ++ ++ public K fromBytes(Class<K> clazz, byte[] val) { ++ return fromBytes(encoder, clazz, val); ++ } ++ ++ @SuppressWarnings("unchecked") ++ public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) { ++ try { ++ if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) { ++ return (K) Byte.valueOf(encoder.decodeByte(val)); ++ } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) { ++ return (K) Boolean.valueOf(encoder.decodeBoolean(val)); ++ } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) { ++ return (K) Short.valueOf(encoder.decodeShort(val)); ++ } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) { ++ return (K) Integer.valueOf(encoder.decodeInt(val)); ++ } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) { ++ return (K) Long.valueOf(encoder.decodeLong(val)); ++ } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) { ++ return (K) Float.valueOf(encoder.decodeFloat(val)); ++ } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) { ++ return (K) Double.valueOf(encoder.decodeDouble(val)); ++ } else if (clazz.equals(String.class)) { ++ return (K) new String(val, "UTF-8"); ++ } else if (clazz.equals(Utf8.class)) { ++ return (K) new Utf8(val); ++ } ++ ++ throw new IllegalArgumentException(UNKOWN + clazz.getName()); ++ } catch (IOException ioe) { ++ LOG.error(ioe.getMessage()); ++ throw new RuntimeException(ioe); ++ } ++ } ++ ++ private static byte[] copyIfNeeded(byte b[], int offset, int len) { ++ if (len != b.length || offset != 0) { ++ byte[] copy = new byte[len]; ++ System.arraycopy(b, offset, copy, 0, copy.length); ++ b = copy; ++ } ++ return b; ++ } ++ ++ public byte[] toBytes(Schema toSchema, Object o) { ++ if (toSchema != null && toSchema.getType() == Type.UNION) { ++ ByteArrayOutputStream baos = new ByteArrayOutputStream(); ++ org.apache.avro.io.BinaryEncoder avroEncoder = EncoderFactory.get().binaryEncoder(baos, null); ++ int unionIndex = 0; ++ try { ++ if (o == null) { ++ unionIndex = firstNullSchemaTypeIndex(toSchema); ++ avroEncoder.writeIndex(unionIndex); ++ avroEncoder.writeNull(); ++ } else { ++ unionIndex = firstNotNullSchemaTypeIndex(toSchema); ++ avroEncoder.writeIndex(unionIndex); ++ avroEncoder.writeBytes(toBytes(o)); ++ } ++ avroEncoder.flush(); ++ return baos.toByteArray(); ++ } catch (IOException e) { ++ LOG.error(e.getMessage()); ++ return toBytes(o); ++ } ++ } else { ++ return toBytes(o); ++ } ++ } ++ ++ private int firstNullSchemaTypeIndex(Schema toSchema) { ++ List<Schema> possibleTypes = toSchema.getTypes(); ++ int unionIndex = 0; ++ for (int i = 0; i < possibleTypes.size(); i++ ) { ++ Type pType = possibleTypes.get(i).getType(); ++ if (pType == Type.NULL) { // FIXME HUGE kludge to pass tests ++ unionIndex = i; break; ++ } ++ } ++ return unionIndex; ++ } ++ ++ private int firstNotNullSchemaTypeIndex(Schema toSchema) { ++ List<Schema> possibleTypes = toSchema.getTypes(); ++ int unionIndex = 0; ++ for (int i = 0; i < possibleTypes.size(); i++ ) { ++ Type pType = possibleTypes.get(i).getType(); ++ if (pType != Type.NULL) { // FIXME HUGE kludge to pass tests ++ unionIndex = i; break; ++ } ++ } ++ return unionIndex; ++ } ++ ++ public byte[] toBytes(Object o) { ++ return toBytes(encoder, o); ++ } ++ ++ public static byte[] toBytes(Encoder encoder, Object o) { ++ ++ try { ++ if (o instanceof String) { ++ return ((String) o).getBytes("UTF-8"); ++ } else if (o instanceof Utf8) { ++ return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getByteLength()); ++ } else if (o instanceof ByteBuffer) { ++ return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining()); ++ } else if (o instanceof Long) { ++ return encoder.encodeLong((Long) o); ++ } else if (o instanceof Integer) { ++ return encoder.encodeInt((Integer) o); ++ } else if (o instanceof Short) { ++ return encoder.encodeShort((Short) o); ++ } else if (o instanceof Byte) { ++ return encoder.encodeByte((Byte) o); ++ } else if (o instanceof Boolean) { ++ return encoder.encodeBoolean((Boolean) o); ++ } else if (o instanceof Float) { ++ return encoder.encodeFloat((Float) o); ++ } else if (o instanceof Double) { ++ return encoder.encodeDouble((Double) o); ++ } else if (o instanceof Enum) { ++ return encoder.encodeInt(((Enum<?>) o).ordinal()); ++ } ++ } catch (IOException ioe) { ++ throw new RuntimeException(ioe); ++ } ++ ++ throw new IllegalArgumentException(UNKOWN + o.getClass().getName()); ++ } ++ ++ private BatchWriter getBatchWriter() throws IOException { ++ if (batchWriter == null) ++ try { ++ BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); ++ batchWriterConfig.setMaxMemory(10000000); ++ batchWriterConfig.setMaxLatency(60000L, TimeUnit.MILLISECONDS); ++ batchWriterConfig.setMaxWriteThreads(4); ++ batchWriter = conn.createBatchWriter(mapping.tableName, batchWriterConfig); ++ } catch (TableNotFoundException e) { ++ throw new IOException(e); ++ } ++ return batchWriter; ++ } ++ ++ /** ++ * Initialize the data store by reading the credentials, setting the client's properties up and ++ * reading the mapping file. Initialize is called when then the call to ++ * {@link org.apache.gora.store.DataStoreFactory#createDataStore} is made. ++ * ++ * @param keyClass ++ * @param persistentClass ++ * @param properties ++ */ ++ @Override ++ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { ++ try{ ++ super.initialize(keyClass, persistentClass, properties); ++ ++ String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null); ++ String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); ++ String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null); ++ String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null); ++ ++ mapping = readMapping(mappingFile); ++ ++ if (mapping.encoder == null || "".equals(mapping.encoder)) { ++ encoder = new BinaryEncoder(); ++ } else { ++ try { ++ encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance(); ++ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { ++ throw new IOException(e); ++ } ++ } ++ ++ try { ++ AuthenticationToken token = new PasswordToken(password);
<TRUNCATED>
