Github user djkevincr commented on a diff in the pull request:

    https://github.com/apache/gora/pull/110#discussion_r130017932
  
    --- Diff: 
gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
 ---
    @@ -0,0 +1,292 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.gora.cassandra.serializers;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.util.Utf8;
    +import org.apache.gora.cassandra.bean.CassandraKey;
    +import org.apache.gora.cassandra.bean.Field;
    +import org.apache.gora.cassandra.store.CassandraMapping;
    +import org.apache.gora.persistency.Persistent;
    +import org.apache.gora.persistency.impl.DirtyListWrapper;
    +import org.apache.gora.persistency.impl.DirtyMapWrapper;
    +import org.apache.gora.persistency.impl.PersistentBase;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.nio.ByteBuffer;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * This class is Utils class for Avro serialization.
    + */
    +class AvroCassandraUtils {
    +
    +  /**
    +   * Default schema index with value "0" used when AVRO Union data types 
are stored.
    +   */
    +  private static final int DEFAULT_UNION_SCHEMA = 0;
    +
    +  private static final Logger LOG = 
LoggerFactory.getLogger(AvroCassandraUtils.class);
    +
    +  static void processKeys(CassandraMapping cassandraMapping, Object key, 
List<String> keys, List<Object> values) {
    +    CassandraKey cassandraKey = cassandraMapping.getCassandraKey();
    +    if (cassandraMapping.isPartitionKeyDefined()) {
    +      if (cassandraKey != null) {
    +        if (key instanceof PersistentBase) {
    +          PersistentBase keyBase = (PersistentBase) key;
    +          for (Schema.Field field : keyBase.getSchema().getFields()) {
    +            if (cassandraMapping.getFieldFromFieldName(field.name()) != 
null) {
    +              keys.add(field.name());
    +              Object value = keyBase.get(field.pos());
    +              value = getFieldValueFromAvroBean(field.schema(), 
field.schema().getType(), value);
    +              values.add(value);
    +            } else {
    +              LOG.debug("Ignoring field {}, Since field couldn't find in 
the {} mapping", new Object[]{field.name(), 
cassandraMapping.getPersistentClass()});
    +            }
    +          }
    +        } else {
    +          LOG.error("Key bean isn't extended by {} .", new 
Object[]{cassandraMapping.getKeyClass(), PersistentBase.class});
    +        }
    +      } else {
    +        for (Field field : 
cassandraMapping.getInlinedDefinedPartitionKeys()) {
    +          keys.add(field.getFieldName());
    +          values.add(key);
    +        }
    +      }
    +    } else {
    +      keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName());
    +      values.add(key.toString());
    +    }
    +  }
    +
    +  /**
    +   * For every field within an object, we pass in a field schema, Type and 
value.
    +   * This enables us to process fields (based on their characteristics)
    +   * preparing them for persistence.
    +   *
    +   * @param fieldSchema the associated field schema
    +   * @param type        the field type
    +   * @param fieldValue  the field value.
    +   * @return field value
    +   */
    +  static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type 
type, Object fieldValue) {
    +    switch (type) {
    +      case RECORD:
    +        PersistentBase persistent = (PersistentBase) fieldValue;
    +        PersistentBase newRecord = (PersistentBase) 
SpecificData.get().newRecord(persistent, persistent.getSchema());
    +        for (Schema.Field member : fieldSchema.getFields()) {
    +          if (member.pos() == 0 || !persistent.isDirty()) {
    +            continue;
    +          }
    +          Schema memberSchema = member.schema();
    +          Schema.Type memberType = memberSchema.getType();
    +          Object memberValue = persistent.get(member.pos());
    +          newRecord.put(member.pos(), 
getFieldValueFromAvroBean(memberSchema, memberType, memberValue));
    +        }
    +        fieldValue = newRecord;
    +        break;
    +      case MAP:
    +        Schema valueSchema = fieldSchema.getValueType();
    +        Schema.Type valuetype = valueSchema.getType();
    +        HashMap<String, Object> map = new HashMap<>();
    +        for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) 
fieldValue).entrySet()) {
    +          String mapKey = e.getKey().toString();
    +          Object mapValue = e.getValue();
    +          mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, 
mapValue);
    +          map.put(mapKey, mapValue);
    +        }
    +        fieldValue = map;
    +        break;
    +      case ARRAY:
    +        valueSchema = fieldSchema.getElementType();
    +        valuetype = valueSchema.getType();
    +        ArrayList<Object> list = new ArrayList<>();
    +        for (Object item : (Collection<?>) fieldValue) {
    +          Object value = getFieldValueFromAvroBean(valueSchema, valuetype, 
item);
    +          list.add(value);
    +        }
    +        fieldValue = list;
    +        break;
    +      case UNION:
    +        // storing the union selected schema, the actual value will
    +        // be stored as soon as we get break out.
    +        if (fieldValue != null) {
    +          int schemaPos = getUnionSchema(fieldValue, fieldSchema);
    +          Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
    +          Schema.Type unionType = unionSchema.getType();
    +          fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, 
fieldValue);
    +        }
    +        break;
    +      case STRING:
    +        fieldValue = fieldValue.toString();
    +        break;
    +      default:
    +        break;
    +    }
    +    return fieldValue;
    +  }
    +
    +  /**
    +   * Given an object and the object schema this function obtains,
    +   * from within the UNION schema, the position of the type used.
    +   * If no data type can be inferred then we return a default value
    +   * of position 0.
    +   *
    +   * @param pValue Object
    +   * @param pUnionSchema avro Schema
    +   * @return the unionSchemaPosition.
    +   */
    +  private static int getUnionSchema(Object pValue, Schema pUnionSchema) {
    +    int unionSchemaPos = 0;
    +//    String valueType = pValue.getClass().getSimpleName();
    +    for (Schema currentSchema : pUnionSchema.getTypes()) {
    +      Schema.Type schemaType = currentSchema.getType();
    +      if (pValue instanceof CharSequence && 
schemaType.equals(Schema.Type.STRING))
    +        return unionSchemaPos;
    +      else if (pValue instanceof ByteBuffer && 
schemaType.equals(Schema.Type.BYTES))
    +        return unionSchemaPos;
    +      else if (pValue instanceof Integer && 
schemaType.equals(Schema.Type.INT))
    +        return unionSchemaPos;
    +      else if (pValue instanceof Long && 
schemaType.equals(Schema.Type.LONG))
    +        return unionSchemaPos;
    +      else if (pValue instanceof Double && 
schemaType.equals(Schema.Type.DOUBLE))
    +        return unionSchemaPos;
    +      else if (pValue instanceof Float && 
schemaType.equals(Schema.Type.FLOAT))
    +        return unionSchemaPos;
    +      else if (pValue instanceof Boolean && 
schemaType.equals(Schema.Type.BOOLEAN))
    +        return unionSchemaPos;
    +      else if (pValue instanceof Map && schemaType.equals(Schema.Type.MAP))
    +        return unionSchemaPos;
    +      else if (pValue instanceof List && 
schemaType.equals(Schema.Type.ARRAY))
    +        return unionSchemaPos;
    +      else if (pValue instanceof Persistent && 
schemaType.equals(Schema.Type.RECORD))
    +        return unionSchemaPos;
    +      unionSchemaPos++;
    +    }
    +    // if we weren't able to determine which data type it is, then we 
return the default
    +    return DEFAULT_UNION_SCHEMA;
    +  }
    +
    +  static String encodeFieldKey(final String key) {
    --- End diff --
    
    This method doesn't have any usages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to