[ https://issues.apache.org/jira/browse/GORA-535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544455#comment-16544455 ]
ASF GitHub Bot commented on GORA-535: ------------------------------------- Github user nishadi commented on a diff in the pull request: https://github.com/apache/gora/pull/134#discussion_r202533866 --- Diff: gora-ignite/src/main/java/org/apache/gora/ignite/store/IgniteStore.java --- @@ -0,0 +1,578 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.ignite.store; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import javax.sql.rowset.CachedRowSet; +import javax.sql.rowset.RowSetFactory; +import javax.sql.rowset.RowSetProvider; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.util.Utf8; +import org.apache.gora.ignite.query.IgniteQuery; +import org.apache.gora.ignite.query.IgniteResult; +import org.apache.gora.ignite.utils.IgniteSQLBuilder; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.PartitionQueryImpl; +import org.apache.gora.store.impl.DataStoreBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.GoraException; +import org.apache.gora.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of a Ignite data store to be used by gora. + * + * @param <K> class to be used for the key + * @param <T> class to be persisted within the store + */ +public class IgniteStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { + + public static final Logger LOG = LoggerFactory.getLogger(IgniteStore.class); + private static final String PARSE_MAPPING_FILE_KEY = "gora.ignite.mapping.file"; + private static final String DEFAULT_MAPPING_FILE = "gora-ignite-mapping.xml"; + private IgniteParameters igniteParameters; + private IgniteMapping igniteMapping; + private Connection connection; + + /* + * Create a threadlocal map for the datum readers and writers, because they + * are not thread safe, at least not before Avro 1.4.0 (See AVRO-650). When + * they are thread safe, it is possible to maintain a single reader and writer + * pair for every schema, instead of one for every thread. + */ + public static final ConcurrentHashMap<Schema, SpecificDatumReader<?>> readerMap = new ConcurrentHashMap<>(); + + public static final ConcurrentHashMap<Schema, SpecificDatumWriter<?>> writerMap = new ConcurrentHashMap<>(); + + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException { + + try { + super.initialize(keyClass, persistentClass, properties); + IgniteMappingBuilder builder = new IgniteMappingBuilder(this); + builder.readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE)); + igniteMapping = builder.getIgniteMapping(); + igniteParameters = IgniteParameters.load(properties, conf); + connection = acquiereConnection(); + LOG.info("Ignite store was successfully initialized"); + } catch (ClassNotFoundException | SQLException ex) { + LOG.error("Error while initializing Ignite store", ex); + throw new GoraException(ex); + } + } + + private Connection acquiereConnection() throws ClassNotFoundException, SQLException { + Class.forName("org.apache.ignite.IgniteJdbcThinDriver"); + StringBuilder urlBuilder = new StringBuilder(); + urlBuilder.append("jdbc:ignite:thin://"); + urlBuilder.append(igniteParameters.getHost()); + if (igniteParameters.getPort() != null) { + urlBuilder.append(":" + igniteParameters.getPort()); + } + if (igniteParameters.getSchema() != null) { + urlBuilder.append("/" + igniteParameters.getSchema()); + } + if (igniteParameters.getUser() != null) { + urlBuilder.append(";" + igniteParameters.getUser()); + } + if (igniteParameters.getPassword() != null) { + urlBuilder.append(";" + igniteParameters.getPassword()); + } + if (igniteParameters.getAdditionalConfigurations() != null) { + urlBuilder.append(igniteParameters.getAdditionalConfigurations()); + } + Connection conn = DriverManager.getConnection(urlBuilder.toString()); + return conn; + } + + @Override + public String getSchemaName() { + return igniteMapping.getTableName(); + } + + @Override + public String getSchemaName(final String mappingSchemaName, + final Class<?> persistentClass) { + return super.getSchemaName(mappingSchemaName, persistentClass); + } + + @Override + public void createSchema() throws GoraException { + if (connection == null) { + throw new GoraException( + "Impossible to create the schema as no connection has been initiated."); + } + if (schemaExists()) { + return; + } + try (Statement stmt = connection.createStatement()) { + String createTableSQL = IgniteSQLBuilder.createTable(igniteMapping); + stmt.executeUpdate(createTableSQL); + LOG.info("Table {} has been created for Ignite instance.", + igniteMapping.getTableName()); + } catch (SQLException ex) { + throw new GoraException(ex); + } + } + + @Override + public void deleteSchema() throws GoraException { + if (connection == null) { + throw new GoraException( + "Impossible to delete the schema as no connection has been initiated."); + } + try (Statement stmt = connection.createStatement()) { + String dropTableSQL = IgniteSQLBuilder.dropTable(igniteMapping.getTableName()); + stmt.executeUpdate(dropTableSQL); + LOG.info("Table {} has been dropped from Ignite instance.", + igniteMapping.getTableName()); + } catch (SQLException ex) { + throw new GoraException(ex); + } + } + + @Override + public boolean schemaExists() throws GoraException { + boolean exists = false; --- End diff -- Rather than having a separate variable, we can directly return true or false below. > Add a data store for Apache Ignite > ----------------------------------- > > Key: GORA-535 > URL: https://issues.apache.org/jira/browse/GORA-535 > Project: Apache Gora > Issue Type: New Feature > Reporter: Nishadi Kirielle > Priority: Major > Labels: gsoc2018 > > Currently, Gora has support for persisting objects to various database models > such as Apache Hbase, Apache Cassandra and much more. [1] This project aims > to extend its capability to provide support for Apache Ignite database. > Apache Ignite is a distributed database, caching and processing platform.[2] > [1]. [http://gora.apache.org/] > [2] . [https://ignite.apache.org/] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)