Github user djkevincr commented on a diff in the pull request:

    https://github.com/apache/gora/pull/134#discussion_r211519945
  
    --- Diff: 
gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java ---
    @@ -0,0 +1,565 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.gora.ignite.store;
    +
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.ConcurrentHashMap;
    +import javax.sql.rowset.CachedRowSet;
    +import javax.sql.rowset.RowSetFactory;
    +import javax.sql.rowset.RowSetProvider;
    +import org.apache.avro.Schema;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificDatumWriter;
    +import org.apache.avro.util.Utf8;
    +import org.apache.gora.ignite.query.IgniteQuery;
    +import org.apache.gora.ignite.query.IgniteResult;
    +import org.apache.gora.ignite.utils.IgniteBackendConstants;
    +import org.apache.gora.ignite.utils.IgniteSQLBuilder;
    +import org.apache.gora.persistency.Persistent;
    +import org.apache.gora.persistency.impl.PersistentBase;
    +import org.apache.gora.query.PartitionQuery;
    +import org.apache.gora.query.Query;
    +import org.apache.gora.query.Result;
    +import org.apache.gora.query.impl.PartitionQueryImpl;
    +import org.apache.gora.store.impl.DataStoreBase;
    +import org.apache.gora.util.AvroUtils;
    +import org.apache.gora.util.GoraException;
    +import org.apache.gora.util.IOUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Implementation of a Ignite data store to be used by gora.
    + *
    + * @param <K> class to be used for the key
    + * @param <T> class to be persisted within the store
    + */
    +public class IgniteStore<K, T extends PersistentBase> extends 
DataStoreBase<K, T> {
    +
    +  private static final Logger LOG = 
LoggerFactory.getLogger(IgniteStore.class);
    +  private static final String PARSE_MAPPING_FILE_KEY = 
"gora.ignite.mapping.file";
    +  private static final String DEFAULT_MAPPING_FILE = 
"gora-ignite-mapping.xml";
    +  private IgniteParameters igniteParameters;
    +  private IgniteMapping igniteMapping;
    +  private Connection connection;
    +  private static final ConcurrentHashMap<Schema, SpecificDatumReader<?>> 
readerMap = new ConcurrentHashMap<>();
    +  private static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>> 
writerMap = new ConcurrentHashMap<>();
    +
    +  @Override
    +  public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) throws GoraException {
    +    try {
    +      super.initialize(keyClass, persistentClass, properties);
    +      IgniteMappingBuilder<K, T> builder = new IgniteMappingBuilder<K, 
T>(this);
    +      builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, 
DEFAULT_MAPPING_FILE));
    +      igniteMapping = builder.getIgniteMapping();
    +      igniteParameters = IgniteParameters.load(properties);
    +      connection = acquireConnection();
    +      LOG.info("Ignite store was successfully initialized");
    +    } catch (ClassNotFoundException | SQLException ex) {
    +      LOG.error("Error while initializing Ignite store", ex);
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  private Connection acquireConnection() throws ClassNotFoundException, 
SQLException {
    +    Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
    +    StringBuilder urlBuilder = new StringBuilder();
    +    urlBuilder.append("jdbc:ignite:thin://");
    +    urlBuilder.append(igniteParameters.getHost());
    +    if (igniteParameters.getPort() != null) {
    +      urlBuilder.append(":" + igniteParameters.getPort());
    +    }
    +    if (igniteParameters.getSchema() != null) {
    +      urlBuilder.append("/" + igniteParameters.getSchema());
    +    }
    +    if (igniteParameters.getUser() != null) {
    +      urlBuilder.append(";" + igniteParameters.getUser());
    +    }
    +    if (igniteParameters.getPassword() != null) {
    +      urlBuilder.append(";" + igniteParameters.getPassword());
    +    }
    +    if (igniteParameters.getAdditionalConfigurations() != null) {
    +      urlBuilder.append(igniteParameters.getAdditionalConfigurations());
    +    }
    +    return DriverManager.getConnection(urlBuilder.toString());
    +  }
    +
    +  @Override
    +  public String getSchemaName() {
    +    return igniteMapping.getTableName();
    +  }
    +
    +  @Override
    +  public String getSchemaName(final String mappingSchemaName,
    +      final Class<?> persistentClass) {
    +    return super.getSchemaName(mappingSchemaName, persistentClass);
    +  }
    +
    +  @Override
    +  public void createSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to create the schema as no connection has been 
initiated.");
    +    }
    +    if (schemaExists()) {
    +      return;
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping);
    +      stmt.executeUpdate(createTableSQL);
    +      LOG.info("Table {} has been created for Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public void deleteSchema() throws GoraException {
    +    if (connection == null) {
    +      throw new GoraException(
    +          "Impossible to delete the schema as no connection has been 
initiated.");
    +    }
    +    try (Statement stmt = connection.createStatement()) {
    +      String dropTableSQL = 
IgniteSQLBuilder.dropTable(igniteMapping.getTableName());
    +      stmt.executeUpdate(dropTableSQL);
    +      LOG.info("Table {} has been dropped from Ignite instance.",
    +          igniteMapping.getTableName());
    +    } catch (SQLException ex) {
    +      throw new GoraException(ex);
    +    }
    +  }
    +
    +  @Override
    +  public boolean schemaExists() throws GoraException {
    +    try (Statement stmt = connection.createStatement()) {
    +      String tableExistsSQL = 
IgniteSQLBuilder.tableExists(igniteMapping.getTableName());
    +      ResultSet executeQuery = stmt.executeQuery(tableExistsSQL);
    +      executeQuery.close();
    +      return true;
    +    } catch (SQLException ex) {
    +      if (ex.getSQLState() != null
    +          && 
ex.getSQLState().equals(IgniteBackendConstants.DEFAULT_IGNITE_TABLE_NOT_EXISTS_CODE))
 {
    +        return false;
    +      } else {
    +        throw new GoraException(ex);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public T get(K key, String[] fields) throws GoraException {
    +    String[] avFields = getFieldsToQuery(fields);
    +    Object[] keyl = null;
    +    if (igniteMapping.getPrimaryKey().size() == 1) {
    +      keyl = new Object[]{key};
    +    } else {
    +      //Composite key pending
    +    }
    +    //Avro fields to Ignite fields
    +    List<String> dbFields = new ArrayList<>();
    +    for (String af : avFields) {
    +      dbFields.add(igniteMapping.getFields().get(af).getName());
    +    }
    +    String selectQuery = IgniteSQLBuilder.selectGet(igniteMapping, 
dbFields);
    +    try (PreparedStatement stmt = 
connection.prepareStatement(selectQuery)) {
    +      IgniteSQLBuilder.fillSelectStatement(stmt, igniteMapping, keyl);
    +      ResultSet rs = stmt.executeQuery();
    +      boolean data = rs.next();
    +      T resp = null;
    +      if (data) {
    +        resp = newInstance(rs, fields);
    +        if (rs.next()) {
    +          LOG.warn("Multiple results for primary key {} in the schema {}, 
ignoring additional rows.", keyl, igniteMapping.getTableName());
    +        }
    +      }
    +      rs.close();
    +      return resp;
    +    } catch (SQLException | IOException ex) {
    +      throw new GoraException(ex);
    +    }
    +
    +  }
    +
    +  public T newInstance(ResultSet rs, String[] fields) throws 
GoraException, SQLException, IOException {
    +    fields = getFieldsToQuery(fields);
    +    T persistent = newPersistent();
    +    for (String f : fields) {
    +      Schema.Field field = fieldMap.get(f);
    +      Schema fieldSchema = field.schema();
    +      String dbField = igniteMapping.getFields().get(f).getName();
    +      Object sv = rs.getObject(dbField);
    +      if (sv == null) {
    +        continue;
    +      }
    +      Object v = deserializeFieldValue(field, fieldSchema, sv, persistent);
    +      persistent.put(field.pos(), v);
    +      persistent.setDirty(field.pos());
    +    }
    +    return persistent;
    +  }
    +
    +  @SuppressWarnings("unchecked")
    +  private Object deserializeFieldValue(Schema.Field field, Schema 
fieldSchema,
    +      Object igniteValue, T persistent) throws IOException {
    +    Object fieldValue = null;
    +    switch (fieldSchema.getType()) {
    +      case MAP:
    +      case ARRAY:
    +      case RECORD:
    +        @SuppressWarnings("rawtypes") SpecificDatumReader reader = 
getDatumReader(fieldSchema);
    +        fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
    +            persistent.get(field.pos()));
    +        break;
    +      case ENUM:
    +        fieldValue = AvroUtils.getEnumValue(fieldSchema, 
igniteValue.toString());
    +        break;
    +      case FIXED:
    +        break;
    +      case BYTES:
    +        fieldValue = ByteBuffer.wrap((byte[]) igniteValue);
    +        break;
    +      case STRING:
    +        fieldValue = new Utf8(igniteValue.toString());
    +        break;
    +      case UNION:
    +        if (fieldSchema.getTypes().size() == 2 && isNullable(fieldSchema)) 
{
    +          int schemaPos = getUnionSchema(igniteValue, fieldSchema);
    +          Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
    +          fieldValue = deserializeFieldValue(field, unionSchema, 
igniteValue, persistent);
    +        } else {
    +          reader = getDatumReader(fieldSchema);
    +          fieldValue = IOUtils.deserialize((byte[]) igniteValue, reader,
    +              persistent.get(field.pos()));
    +        }
    +        break;
    +      default:
    +        fieldValue = igniteValue;
    +    }
    +    return fieldValue;
    +
    +  }
    +
    +  @Override
    +  public void put(K key, T obj) throws GoraException {
    +    try {
    +      if (obj.isDirty()) {
    +        Schema schema = obj.getSchema();
    +        List<Schema.Field> fields = schema.getFields();
    +        Map<Column, Object> data = new HashMap<>();
    +        if (igniteMapping.getPrimaryKey().size() == 1) {
    +          Column getKey = igniteMapping.getPrimaryKey().get(0);
    +          data.put(getKey, key);
    +        } else {
    +          //Composite keys pending..
    --- End diff --
    
    You may create JIRA ticket for the composite key feature as it is not 
supported at the moment.
    You may refer that JIRA ticket ID from comment it self.


---

Reply via email to