Hi Imesh,
Yes, INSERT INTO is not working for carbonJDBC tables which have primary
keys set when we try to insert record which is already existing in JDBC
table.
If we set primary key for a JDBC table and in a scheduled spark script when
we try to copy persisted events raw data from carbonAnaltyics table to JDBC
table using 'INSERT INTO' query as below:
CREATE TEMPORARY TABLE memberstatus
USING CarbonAnalytics
OPTIONS (tableName "MEMBER_LIFECYCLE");
create temporary table member_status_new
using CarbonJDBC options (dataSource
"WSO2_ANALYTICS_PROCESSED_DATA_STORE_DB", tableName "MEMBER_STATUS_NEW");
INSERT INTO TABLE member_status_new select timestamp,
application_id, cluster_alias, member_id,
member_status from memberstatus;
where MEMBER_STATUS_NEW table is created before hand using following sql
statement:
CREATE TABLE ANALYTICS_PROCESSED_DATA_STORE.MEMBER_STATUS_NEW(Time BIGINT
UNSIGNED, ApplicationId VARCHAR(150), ClusterAlias VARCHAR(150), MemberId
VARCHAR(150), MemberStatus VARCHAR(50), Primary Key(Time, MemberId));
I'm getting below error:
[2015-10-20 15:38:30,305] ERROR
{org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter} - Error in
executing task: Error while saving data to the table MEMBER_STATUS_NEW
java.lang.RuntimeException: Error while saving data to the table
MEMBER_STATUS_NEW
at
org.apache.spark.sql.jdbc.carbon.JDBCRelation.insert(JDBCRelation.scala:193)
at org.apache.spark.sql.sources.InsertIntoDataSource.run(commands.scala:53)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:144)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:128)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:755)
at
org.wso2.carbon.analytics.spark.core.internal.SparkAnalyticsExecutor.executeQueryLocal(SparkAnalyticsExecutor.java:623)
at
org.wso2.carbon.analytics.spark.core.internal.SparkAnalyticsExecutor.executeQuery(SparkAnalyticsExecutor.java:605)
at
org.wso2.carbon.analytics.spark.core.CarbonAnalyticsProcessorService.executeQuery(CarbonAnalyticsProcessorService.java:199)
at
org.wso2.carbon.analytics.spark.core.CarbonAnalyticsProcessorService.executeScript(CarbonAnalyticsProcessorService.java:149)
at
org.wso2.carbon.analytics.spark.core.AnalyticsTask.execute(AnalyticsTask.java:57)
at
org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67)
at org.quartz.core.JobRunShell.run(JobRunShell.java:213)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by:
org.wso2.carbon.analytics.spark.core.exception.AnalyticsExecutionException:
Error while saving data to the table MEMBER_STATUS_NEW
at
org.apache.spark.sql.jdbc.carbon.package$JDBCWriteDetails$.saveTable(carbon.scala:115)
at
org.apache.spark.sql.jdbc.carbon.JDBCRelation.insert(JDBCRelation.scala:188)
... 26 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 3584.0 failed 1 times, most recent failure: Lost
task 0.0 in stage 3584.0 (TID 124672, localhost):
com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException:
Duplicate entry
'1445331059446-single-cartridge-app.my-php.php.domain31185c44-697' for key
'PRIMARY'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:400)
at com.mysql.jdbc.Util.getInstance(Util.java:383)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:973)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3847)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3783)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2447)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2594)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
at
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1901)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2113)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2049)
at
com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2034)
at
org.apache.spark.sql.jdbc.carbon.package$JDBCWriteDetails$.savePartition(carbon.scala:175)
at
org.apache.spark.sql.jdbc.carbon.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(carbon.scala:107)
at
org.apache.spark.sql.jdbc.carbon.package$JDBCWriteDetails$$anonfun$saveTable$1.apply(carbon.scala:106)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:878)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:878)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
This is happening because every time the above 'INSERT INTO' script gets
executed, it tries to copy entire data in memberstatus carbonAnalytics
table to member_status_new carbonJDBC table and *at the time of trying to
insert a already existing record*, the above error is thrown.
But we can run a INSERT INTO statement on top of carbonAnalytics table
which has primary keys set.
Thanks.
On Tue, Oct 20, 2015 at 3:25 PM, Imesh Gunaratne <[email protected]> wrote:
> Hi Thanuja,
>
> Can you please explain the following statement that you have mentioned in
> the JIRA?
>
> 2. Also we can't execute INSERT INTO for carbonJDBC table which has
> primary key set.
>
>
> This looks to me that INSERT INTO does not work with CarbonJDBC with
> tables having primary keys.
>
> Thanks
>
>
> On Tue, Oct 20, 2015 at 1:38 PM, Thanuja Uruththirakodeeswaran <
> [email protected]> wrote:
>
>> Hi Niranda,
>>
>> I have created a jira [1] to track the issues we have in carbonJDBC
>> option.
>>
>> [1]. https://wso2.org/jira/browse/DAS-273
>>
>> Thanks.
>>
>> On Thu, Oct 1, 2015 at 3:12 PM, Sinthuja Ragendran <[email protected]>
>> wrote:
>>
>>> Hi Niranda,
>>>
>>> On Thu, Oct 1, 2015 at 2:28 PM, Inosh Goonewardena <[email protected]>
>>> wrote:
>>>
>>>> Hi Niranda,
>>>>
>>>> On Thu, Oct 1, 2015 at 1:33 PM, Sinthuja Ragendran <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Niranda,
>>>>>
>>>>> On Thu, Oct 1, 2015 at 1:28 PM, Niranda Perera <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Thanuja and Imesh,
>>>>>>
>>>>>> let me clarify the use of the term "create temporary table" with
>>>>>> regard to Spark.
>>>>>> inside DAS we save ('persist') data in DAL (Dara access layer)
>>>>>> tables. So in order for us to query these tables, spark needs some sort
>>>>>> of
>>>>>> a mapping to the tables in DAL in its runtime environment. This mapping
>>>>>> is
>>>>>> created in the temporary table queries. These temp tables are only a
>>>>>> mapping. Not a physical table.
>>>>>>
>>>>>> @thanuja, yes you are correct! We have to manually create the tables
>>>>>> in MySQL before making the temp table mapping in Spark SQL.
>>>>>>
>>>>> With Carbon JDBC connector, can we try to create the table if it is
>>>>> not existing? May be we can let the users to pass the actual create table
>>>>> statement as another parameter with options. IMHO it will be more user
>>>>> friendly if we could do that, WDYT?
>>>>>
>>>>
>>>> Yes. +1. For tables created using CarbonAnalytics it is possible to
>>>> provide the table schema as below [1]. I believe we can use the similar
>>>> approach in CarbonJDBC also to provide the create table query.
>>>>
>>>> As per the current implementation what happens is even though the table
>>>> is created manually before the script execution, "insert overwrite..."
>>>> statement execution will delete the original table and recreate a new table
>>>> using a generated schema(schema information is generated using the original
>>>> table structure). In this approach, table that is re-created at the query
>>>> execution will not have primary keys and indexes of the original table(if
>>>> there were any). So if we can provide a complete create table query, we can
>>>> preserve original table structure too.
>>>>
>>>
>>> Yeah, +1.
>>>
>>>
>>>>
>>>> On the other hand, I believe we should also support "insert into.."
>>>> statements in CarbonJDBC. "insert into.." statements will not delete and
>>>> recreate the table like the "insert overwrite..." statements, and it will
>>>> only update the existing table[2].
>>>>
>>>
>>> Yeah, I also have concern on this. Because currently the insert
>>> overwrite statement drops the table, and repopulate the data entirely, and
>>> hence the dashboard which reads from the table may be empty/partial data.
>>> This is also an issue when we are purging the original data scenario, where
>>> the summarised data will also be cleaned and no old data available to
>>> repopulate the summarised data again. Can we have it as replace the row if
>>> it's already existing in insert overwrite, rather dropping the entire table
>>> to avoid such issues?
>>>
>>> Thanks,
>>> Sinthuja.
>>>
>>>
>>>>
>>>> [1] CREATE TEMPORARY TABLE plugUsage
>>>> USING CarbonAnalytics
>>>> OPTIONS (tableName "plug_usage",
>>>> * schema "house_id INT, household_id INT, plug_id INT, usage
>>>> FLOAT"*,
>>>> primaryKeys "household_id, plug_id"
>>>> );
>>>>
>>>> [2] [Dev] [Architecture] Carbon Spark JDBC connector
>>>>
>>>>
>>>>>
>>>>> Thanks,
>>>>> Sinthuja.
>>>>>
>>>>>
>>>>>> On Thu, Oct 1, 2015 at 9:53 AM, Thanuja Uruththirakodeeswaran <
>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>
>>>>>>> In DAS spark environment, we can't directly insert the analyzed data
>>>>>>> to our mysql table. We should create a temporary table using our
>>>>>>> datasources to manipulate them.
>>>>>>>
>>>>>>
>>>>>> Yes, can you please explain the reasons? What does this analysis do?
>>>>>> Why we cannot directly insert them to the RDBMS?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Thu, Oct 1, 2015 at 9:53 AM, Thanuja Uruththirakodeeswaran <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Imesh,
>>>>>>>
>>>>>>> If we take the above scenario, I need to insert the
>>>>>>> analyzed/aggregated data which is obtained as result after spark sql
>>>>>>> processing, to my mysql table (sample_table). In order to do that,
>>>>>>> first we
>>>>>>> need to create a temporary table using the corresponding mysql database
>>>>>>> (sample_datasource) and table(sample_table) in spark environment and
>>>>>>> then
>>>>>>> only by inserting data to this temporary table in spark environment, we
>>>>>>> can
>>>>>>> update our mysql table.
>>>>>>>
>>>>>>> In DAS spark environment, we can't directly insert the analyzed data
>>>>>>> to our mysql table. We should create a temporary table using our
>>>>>>> datasources to manipulate them. I think that's why they named it as '
>>>>>>> *temporary*' table.
>>>>>>>
>>>>>>> @Niranda Please correct me if I'm wrong.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> On Thu, Oct 1, 2015 at 7:00 AM, Imesh Gunaratne <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Thanuja,
>>>>>>>>
>>>>>>>> Can you please explain the purpose of these temporary tables?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> On Wed, Sep 30, 2015 at 11:53 PM, Thanuja Uruththirakodeeswaran <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> When we create temporary tables in spark environment using
>>>>>>>>> carbonJDBC option as explained in [1], we are using a datasource and
>>>>>>>>> tableName from which spark environment temporary table will get data
>>>>>>>>> as
>>>>>>>>> follow:
>>>>>>>>> CREATE TEMPORARY TABLE <temp_table> using CarbonJDBC options
>>>>>>>>> (dataSource "<datasource name>", tableName "<table name>");
>>>>>>>>>
>>>>>>>>> I've used a mysql database (sample_datasource) for datasource and
>>>>>>>>> used mysql tables created in that database for tableName
>>>>>>>>> (sample_table) as
>>>>>>>>> follow:
>>>>>>>>> CREATE TEMPORARY TABLE sample using CarbonJDBC options
>>>>>>>>> (dataSource "sample_datasource", tableName "sample_table");
>>>>>>>>>
>>>>>>>>> But I'm creating the mysql database and tables by executing sql
>>>>>>>>> statements manually. Is there a way in DAS that we can add these sql
>>>>>>>>> statements inside a script and create the database and tables when we
>>>>>>>>> start
>>>>>>>>> the server?
>>>>>>>>>
>>>>>>>>> [1]. https://docs.wso2.com/display/DAS300/Spark+Query+Language
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanuja Uruththirakodeeswaran
>>>>>>>>> Software Engineer
>>>>>>>>> WSO2 Inc.;http://wso2.com
>>>>>>>>> lean.enterprise.middleware
>>>>>>>>>
>>>>>>>>> mobile: +94 774363167
>>>>>>>>>
>>>>>>>>> _______________________________________________
>>>>>>>>> Dev mailing list
>>>>>>>>> [email protected]
>>>>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> *Imesh Gunaratne*
>>>>>>>> Senior Technical Lead
>>>>>>>> WSO2 Inc: http://wso2.com
>>>>>>>> T: +94 11 214 5345 M: +94 77 374 2057
>>>>>>>> W: http://imesh.gunaratne.org
>>>>>>>> Lean . Enterprise . Middleware
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanuja Uruththirakodeeswaran
>>>>>>> Software Engineer
>>>>>>> WSO2 Inc.;http://wso2.com
>>>>>>> lean.enterprise.middleware
>>>>>>>
>>>>>>> mobile: +94 774363167
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Imesh Gunaratne*
>>>>>> Senior Technical Lead
>>>>>> WSO2 Inc: http://wso2.com
>>>>>> T: +94 11 214 5345 M: +94 77 374 2057
>>>>>> W: http://imesh.gunaratne.org
>>>>>> Lean . Enterprise . Middleware
>>>>>>
>>>>>>
>>>>>> _______________________________________________
>>>>>> Dev mailing list
>>>>>> [email protected]
>>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Sinthuja Rajendran*
>>>>> Associate Technical Lead
>>>>> WSO2, Inc.:http://wso2.com
>>>>>
>>>>> Blog: http://sinthu-rajan.blogspot.com/
>>>>> Mobile: +94774273955
>>>>>
>>>>>
>>>>>
>>>>> _______________________________________________
>>>>> Dev mailing list
>>>>> [email protected]
>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>>
>>>> Inosh Goonewardena
>>>> Associate Technical Lead- WSO2 Inc.
>>>> Mobile: +94779966317
>>>>
>>>
>>>
>>>
>>> --
>>> *Sinthuja Rajendran*
>>> Associate Technical Lead
>>> WSO2, Inc.:http://wso2.com
>>>
>>> Blog: http://sinthu-rajan.blogspot.com/
>>> Mobile: +94774273955
>>>
>>>
>>>
>>
>>
>> --
>> Thanuja Uruththirakodeeswaran
>> Software Engineer
>> WSO2 Inc.;http://wso2.com
>> lean.enterprise.middleware
>>
>> mobile: +94 774363167
>>
>
>
>
> --
> *Imesh Gunaratne*
> Senior Technical Lead
> WSO2 Inc: http://wso2.com
> T: +94 11 214 5345 M: +94 77 374 2057
> W: http://imesh.gunaratne.org
> Lean . Enterprise . Middleware
>
>
--
Thanuja Uruththirakodeeswaran
Software Engineer
WSO2 Inc.;http://wso2.com
lean.enterprise.middleware
mobile: +94 774363167
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev