Thanks Thanuja, please add above details to the JIRA issue.
On Tue, Oct 20, 2015 at 5:11 PM, Thanuja Uruththirakodeeswaran <
[email protected]> wrote:
> Hi Imesh,
>
> Yes, INSERT INTO is not working for carbonJDBC tables which have primary
> keys set when we try to insert record which is already existing in JDBC
> table.
>
> If we set primary key for a JDBC table and in a scheduled spark script
> when we try to copy persisted events raw data from carbonAnaltyics table to
> JDBC table using 'INSERT INTO' query as below:
>
> CREATE TEMPORARY TABLE memberstatus
> USING CarbonAnalytics
> OPTIONS (tableName "MEMBER_LIFECYCLE");
>
> create temporary table member_status_new
> using CarbonJDBC options (dataSource
> "WSO2_ANALYTICS_PROCESSED_DATA_STORE_DB", tableName "MEMBER_STATUS_NEW");
>
> INSERT INTO TABLE member_status_new select timestamp,
> application_id, cluster_alias, member_id,
> member_status from memberstatus;
>
> where MEMBER_STATUS_NEW table is created before hand using following sql
> statement:
> CREATE TABLE ANALYTICS_PROCESSED_DATA_STORE.MEMBER_STATUS_NEW(Time BIGINT
> UNSIGNED, ApplicationId VARCHAR(150), ClusterAlias VARCHAR(150), MemberId
> VARCHAR(150), MemberStatus VARCHAR(50), Primary Key(Time, MemberId));
>
>
> I'm getting below error:
> [2015-10-20 15:38:30,305] ERROR
> {org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter} - Error in
> executing task: Error while saving data to the table MEMBER_STATUS_NEW
> java.lang.RuntimeException: Error while saving data to the table
> MEMBER_STATUS_NEW
> at
> org.apache.spark.sql.jdbc.carbon.JDBCRelation.insert(JDBCRelation.scala:193)
> at org.apache.spark.sql.sources.InsertIntoDataSource.run(commands.scala:53)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144)
> at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:128)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:755)
> at
> org.wso2.carbon.analytics.spark.core.internal.SparkAnalyticsExecutor.executeQueryLocal(SparkAnalyticsExecutor.java:623)
> at
> org.wso2.carbon.analytics.spark.core.internal.SparkAnalyticsExecutor.executeQuery(SparkAnalyticsExecutor.java:605)
> at
> org.wso2.carbon.analytics.spark.core.CarbonAnalyticsProcessorService.executeQuery(CarbonAnalyticsProcessorService.java:199)
> at
> org.wso2.carbon.analytics.spark.core.CarbonAnalyticsProcessorService.executeScript(CarbonAnalyticsProcessorService.java:149)
> at
> org.wso2.carbon.analytics.spark.core.AnalyticsTask.execute(AnalyticsTask.java:57)
> at
> org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67)
> at org.quartz.core.JobRunShell.run(JobRunShell.java:213)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.wso2.carbon.analytics.spark.core.exception.AnalyticsExecutionException:
> Error while saving data to the table MEMBER_STATUS_NEW
> at
> org.apache.spark.sql.jdbc.carbon.package$JDBCWriteDetails$.saveTable(carbon.scala:115)
> at
> org.apache.spark.sql.jdbc.carbon.JDBCRelation.insert(JDBCRelation.scala:188)
> ... 26 more
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0 in stage 3584.0 failed 1 times, most recent failure: Lost
> task 0.0 in stage 3584.0 (TID 124672, localhost):
> com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException:
> Duplicate entry
> '1445331059446-single-cartridge-app.my-php.php.domain31185c44-697' for key
> 'PRIMARY'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at com.mysql.jdbc.Util.handleNewInstance(Util.java:400)
> at com.mysql.jdbc.Util.getInstance(Util.java:383)
> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:973)
> at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3847)
> at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3783)
> at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2447)
> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2594)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
> at
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1901)
> at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2113)
> at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2049)
> at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2034)
> at
> org.apache.spark.sql.jdbc.carbon.package$JDBCWriteDetails$.savePartition(carbon.scala:175)
> at
> org.apache.spark.sql.jdbc.carbon.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(carbon.scala:107)
> at
> org.apache.spark.sql.jdbc.carbon.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(carbon.scala:106)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:878)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:878)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> This is happening because every time the above 'INSERT INTO' script gets
> executed, it tries to copy entire data in memberstatus carbonAnalytics
> table to member_status_new carbonJDBC table and *at the time of trying to
> insert a already existing record*, the above error is thrown.
>
> But we can run a INSERT INTO statement on top of carbonAnalytics table
> which has primary keys set.
>
> Thanks.
>
> On Tue, Oct 20, 2015 at 3:25 PM, Imesh Gunaratne <[email protected]> wrote:
>
>> Hi Thanuja,
>>
>> Can you please explain the following statement that you have mentioned in
>> the JIRA?
>>
>> 2. Also we can't execute INSERT INTO for carbonJDBC table which has
>> primary key set.
>>
>>
>> This looks to me that INSERT INTO does not work with CarbonJDBC with
>> tables having primary keys.
>>
>> Thanks
>>
>>
>> On Tue, Oct 20, 2015 at 1:38 PM, Thanuja Uruththirakodeeswaran <
>> [email protected]> wrote:
>>
>>> Hi Niranda,
>>>
>>> I have created a jira [1] to track the issues we have in carbonJDBC
>>> option.
>>>
>>> [1]. https://wso2.org/jira/browse/DAS-273
>>>
>>> Thanks.
>>>
>>> On Thu, Oct 1, 2015 at 3:12 PM, Sinthuja Ragendran <[email protected]>
>>> wrote:
>>>
>>>> Hi Niranda,
>>>>
>>>> On Thu, Oct 1, 2015 at 2:28 PM, Inosh Goonewardena <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Niranda,
>>>>>
>>>>> On Thu, Oct 1, 2015 at 1:33 PM, Sinthuja Ragendran <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Niranda,
>>>>>>
>>>>>> On Thu, Oct 1, 2015 at 1:28 PM, Niranda Perera <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Thanuja and Imesh,
>>>>>>>
>>>>>>> let me clarify the use of the term "create temporary table" with
>>>>>>> regard to Spark.
>>>>>>> inside DAS we save ('persist') data in DAL (Dara access layer)
>>>>>>> tables. So in order for us to query these tables, spark needs some sort
>>>>>>> of
>>>>>>> a mapping to the tables in DAL in its runtime environment. This mapping
>>>>>>> is
>>>>>>> created in the temporary table queries. These temp tables are only a
>>>>>>> mapping. Not a physical table.
>>>>>>>
>>>>>>> @thanuja, yes you are correct! We have to manually create the tables
>>>>>>> in MySQL before making the temp table mapping in Spark SQL.
>>>>>>>
>>>>>> With Carbon JDBC connector, can we try to create the table if it is
>>>>>> not existing? May be we can let the users to pass the actual create table
>>>>>> statement as another parameter with options. IMHO it will be more user
>>>>>> friendly if we could do that, WDYT?
>>>>>>
>>>>>
>>>>> Yes. +1. For tables created using CarbonAnalytics it is possible to
>>>>> provide the table schema as below [1]. I believe we can use the similar
>>>>> approach in CarbonJDBC also to provide the create table query.
>>>>>
>>>>> As per the current implementation what happens is even though the
>>>>> table is created manually before the script execution, "insert
>>>>> overwrite..." statement execution will delete the original table and
>>>>> recreate a new table using a generated schema(schema information is
>>>>> generated using the original table structure). In this approach, table
>>>>> that
>>>>> is re-created at the query execution will not have primary keys and
>>>>> indexes
>>>>> of the original table(if there were any). So if we can provide a complete
>>>>> create table query, we can preserve original table structure too.
>>>>>
>>>>
>>>> Yeah, +1.
>>>>
>>>>
>>>>>
>>>>> On the other hand, I believe we should also support "insert into.."
>>>>> statements in CarbonJDBC. "insert into.." statements will not delete and
>>>>> recreate the table like the "insert overwrite..." statements, and it will
>>>>> only update the existing table[2].
>>>>>
>>>>
>>>> Yeah, I also have concern on this. Because currently the insert
>>>> overwrite statement drops the table, and repopulate the data entirely, and
>>>> hence the dashboard which reads from the table may be empty/partial data.
>>>> This is also an issue when we are purging the original data scenario, where
>>>> the summarised data will also be cleaned and no old data available to
>>>> repopulate the summarised data again. Can we have it as replace the row if
>>>> it's already existing in insert overwrite, rather dropping the entire table
>>>> to avoid such issues?
>>>>
>>>> Thanks,
>>>> Sinthuja.
>>>>
>>>>
>>>>>
>>>>> [1] CREATE TEMPORARY TABLE plugUsage
>>>>> USING CarbonAnalytics
>>>>> OPTIONS (tableName "plug_usage",
>>>>> * schema "house_id INT, household_id INT, plug_id INT, usage
>>>>> FLOAT"*,
>>>>> primaryKeys "household_id, plug_id"
>>>>> );
>>>>>
>>>>> [2] [Dev] [Architecture] Carbon Spark JDBC connector
>>>>>
>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Sinthuja.
>>>>>>
>>>>>>
>>>>>>> On Thu, Oct 1, 2015 at 9:53 AM, Thanuja Uruththirakodeeswaran <
>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> In DAS spark environment, we can't directly insert the analyzed
>>>>>>>> data to our mysql table. We should create a temporary table using our
>>>>>>>> datasources to manipulate them.
>>>>>>>>
>>>>>>>
>>>>>>> Yes, can you please explain the reasons? What does this analysis do?
>>>>>>> Why we cannot directly insert them to the RDBMS?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Thu, Oct 1, 2015 at 9:53 AM, Thanuja Uruththirakodeeswaran <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Imesh,
>>>>>>>>
>>>>>>>> If we take the above scenario, I need to insert the
>>>>>>>> analyzed/aggregated data which is obtained as result after spark sql
>>>>>>>> processing, to my mysql table (sample_table). In order to do that,
>>>>>>>> first we
>>>>>>>> need to create a temporary table using the corresponding mysql database
>>>>>>>> (sample_datasource) and table(sample_table) in spark environment and
>>>>>>>> then
>>>>>>>> only by inserting data to this temporary table in spark environment,
>>>>>>>> we can
>>>>>>>> update our mysql table.
>>>>>>>>
>>>>>>>> In DAS spark environment, we can't directly insert the analyzed
>>>>>>>> data to our mysql table. We should create a temporary table using our
>>>>>>>> datasources to manipulate them. I think that's why they named it as '
>>>>>>>> *temporary*' table.
>>>>>>>>
>>>>>>>> @Niranda Please correct me if I'm wrong.
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> On Thu, Oct 1, 2015 at 7:00 AM, Imesh Gunaratne <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Thanuja,
>>>>>>>>>
>>>>>>>>> Can you please explain the purpose of these temporary tables?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> On Wed, Sep 30, 2015 at 11:53 PM, Thanuja Uruththirakodeeswaran <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> When we create temporary tables in spark environment using
>>>>>>>>>> carbonJDBC option as explained in [1], we are using a datasource and
>>>>>>>>>> tableName from which spark environment temporary table will get data
>>>>>>>>>> as
>>>>>>>>>> follow:
>>>>>>>>>> CREATE TEMPORARY TABLE <temp_table> using CarbonJDBC options
>>>>>>>>>> (dataSource "<datasource name>", tableName "<table name>");
>>>>>>>>>>
>>>>>>>>>> I've used a mysql database (sample_datasource) for datasource and
>>>>>>>>>> used mysql tables created in that database for tableName
>>>>>>>>>> (sample_table) as
>>>>>>>>>> follow:
>>>>>>>>>> CREATE TEMPORARY TABLE sample using CarbonJDBC options
>>>>>>>>>> (dataSource "sample_datasource", tableName "sample_table");
>>>>>>>>>>
>>>>>>>>>> But I'm creating the mysql database and tables by executing sql
>>>>>>>>>> statements manually. Is there a way in DAS that we can add these sql
>>>>>>>>>> statements inside a script and create the database and tables when
>>>>>>>>>> we start
>>>>>>>>>> the server?
>>>>>>>>>>
>>>>>>>>>> [1]. https://docs.wso2.com/display/DAS300/Spark+Query+Language
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Thanuja Uruththirakodeeswaran
>>>>>>>>>> Software Engineer
>>>>>>>>>> WSO2 Inc.;http://wso2.com
>>>>>>>>>> lean.enterprise.middleware
>>>>>>>>>>
>>>>>>>>>> mobile: +94 774363167
>>>>>>>>>>
>>>>>>>>>> _______________________________________________
>>>>>>>>>> Dev mailing list
>>>>>>>>>> [email protected]
>>>>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> *Imesh Gunaratne*
>>>>>>>>> Senior Technical Lead
>>>>>>>>> WSO2 Inc: http://wso2.com
>>>>>>>>> T: +94 11 214 5345 M: +94 77 374 2057
>>>>>>>>> W: http://imesh.gunaratne.org
>>>>>>>>> Lean . Enterprise . Middleware
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanuja Uruththirakodeeswaran
>>>>>>>> Software Engineer
>>>>>>>> WSO2 Inc.;http://wso2.com
>>>>>>>> lean.enterprise.middleware
>>>>>>>>
>>>>>>>> mobile: +94 774363167
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Imesh Gunaratne*
>>>>>>> Senior Technical Lead
>>>>>>> WSO2 Inc: http://wso2.com
>>>>>>> T: +94 11 214 5345 M: +94 77 374 2057
>>>>>>> W: http://imesh.gunaratne.org
>>>>>>> Lean . Enterprise . Middleware
>>>>>>>
>>>>>>>
>>>>>>> _______________________________________________
>>>>>>> Dev mailing list
>>>>>>> [email protected]
>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Sinthuja Rajendran*
>>>>>> Associate Technical Lead
>>>>>> WSO2, Inc.:http://wso2.com
>>>>>>
>>>>>> Blog: http://sinthu-rajan.blogspot.com/
>>>>>> Mobile: +94774273955
>>>>>>
>>>>>>
>>>>>>
>>>>>> _______________________________________________
>>>>>> Dev mailing list
>>>>>> [email protected]
>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>>
>>>>> Inosh Goonewardena
>>>>> Associate Technical Lead- WSO2 Inc.
>>>>> Mobile: +94779966317
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> *Sinthuja Rajendran*
>>>> Associate Technical Lead
>>>> WSO2, Inc.:http://wso2.com
>>>>
>>>> Blog: http://sinthu-rajan.blogspot.com/
>>>> Mobile: +94774273955
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanuja Uruththirakodeeswaran
>>> Software Engineer
>>> WSO2 Inc.;http://wso2.com
>>> lean.enterprise.middleware
>>>
>>> mobile: +94 774363167
>>>
>>
>>
>>
>> --
>> *Imesh Gunaratne*
>> Senior Technical Lead
>> WSO2 Inc: http://wso2.com
>> T: +94 11 214 5345 M: +94 77 374 2057
>> W: http://imesh.gunaratne.org
>> Lean . Enterprise . Middleware
>>
>>
>
>
> --
> Thanuja Uruththirakodeeswaran
> Software Engineer
> WSO2 Inc.;http://wso2.com
> lean.enterprise.middleware
>
> mobile: +94 774363167
>
--
*Imesh Gunaratne*
Senior Technical Lead
WSO2 Inc: http://wso2.com
T: +94 11 214 5345 M: +94 77 374 2057
W: http://imesh.gunaratne.org
Lean . Enterprise . Middleware
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev