structured streaming bookkeeping formats

2018-10-27 Thread Koert Kuipers
i was reading this blog post from last year about structured streaming
run-once trigger:
https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

its a nice idea to replace a batch job with structured streaming because it
does the bookkeeping (whats new, failure recovery, etc.) for you.

but that's also the part that scares me a bit. when its all done for me and
it breaks anyhow i am not sure i know how to recover. and i am unsure how
to upgrade.
so... are the formats that spark structured streaming uses for
"bookkeeping" easily readable (like say json) and stable? does it consist
of files i can go look at and  understand and edit/manipulate myself if
needed? are there are references to the format used?

thank you!

best,
koert


Re: SIGBUS (0xa) when using DataFrameWriter.insertInto

2018-10-27 Thread alexzautke
Now its uploaded! Thanks :)



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SIGBUS (0xa) when using DataFrameWriter.insertInto

2018-10-27 Thread Ted Yu
I don't seem to find the log.
Can you double check ?
Thanks
 Original message From: alexzautke 
 Date: 10/27/18  8:54 AM  (GMT-08:00) To: 
user@spark.apache.org Subject: Re: SIGBUS (0xa) when using 
DataFrameWriter.insertInto 
Please also find attached a complete error log.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is spark not good for ingesting into updatable databases?

2018-10-27 Thread ravidspark
Hi Jorn,

Thanks for your kind reply. I do accept that there might be something in the
code. Any help would be appreciated. 

To give you some insights, I checked the source of the message in kafka if
it has been repeated twice. But, I could only find it once. Also, it would
have been convincing if all the messages are duplicated instead of only few.
Please find below my source code & also a snapshot of the message that is
getting duplicated in the entire logs:




JavaDStream> prePepForMappedJsonStream =
stream.map(new Function>() {

Map mappedJson = null;
@Override
public Map call(String 
inputJsonMessage) {
try {
if(StringUtils.length(inputJsonMessage) 
!= 2) {
mappedJson = new HashMap<>();
StopWatch watch = 
StopWatchSingleton.instance();
watch.reset();watch.start();
logger.info("Transformation-1 
Start:{} & Input Message is: {}",
LocalDateTime.now(),inputJsonMessage);
JsonToMapPrePepTransformer 
instance = new
JsonToMapPrePepTransformer(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"));
mappedJson = 
instance.transformJsonToMap(inputJsonMessage);
watch.stop();
logger.info("Transformation-1 
End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(), mappedJson);
}
} catch (Exception e) {
logger.error("",e);
}
}
return mappedJson;
}   
});

JavaDStream> transformedStream =
prePepForMappedJsonStream.map(new Function,
Map>() {

Map resultMap = null;
@Override
public Map call(Map 
readyToTransformMap)
throws Exception {
if(readyToTransformMap != null) {
StopWatch watch = 
StopWatchSingleton.instance();
watch.reset();watch.start();
logger.info("Transformation-2 Start:{} 
& Input Message is: {}",
LocalDateTime.now(),readyToTransformMap);
resultMap = new HashMap<>();
resultMap =
YBEDFormatter.init(StringUtils.join(store.getName().toLowerCase(),"_yellowbrick"),
readyToTransformMap);
watch.stop();
logger.info("Transformation-2 End:{}, 
Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(), resultMap);
}
return resultMap;
}
});

JavaDStream kafkaPreIngestStream =
transformedStream.map(new Function, ResultMapHolder>()
{

ResultMapHolder resultMapBean = null;
@Override
public ResultMapHolder call(Map 
finalTransformedMap)
throws Exception {
try {
if(finalTransformedMap != null) {
StopWatch watch = 
StopWatchSingleton.instance();
watch.reset();watch.start();
logger.info("Transformation-3 
Start:{} & Input Message is: {}",
LocalDateTime.now(),finalTransformedMap);
resultMapBean =
MapToArrayTransformerForYBIngestion.instance().transformMapToOrderedArrayOfValues(finalTransformedMap,
tableColumns);
watch.stop();
logger.info("Transformation-3 
End:{}, Elapsed:{} & OutputMessage is:
{}", LocalDateTime.now(), watch.getTime(),
Arrays.toString(resultMapBean.getOutputRow()));
}
} catch (Exception e) {
logger.error("",e);
}
return resultMapBean;
}
});



Please observe 

Re: SIGBUS (0xa) when using DataFrameWriter.insertInto

2018-10-27 Thread alexzautke
Please also find attached a complete error log.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SIGBUS (0xa) when using DataFrameWriter.insertInto

2018-10-27 Thread alexzautke
Hi everyone!

I am currently running into the issue that a call to
DataFrameWriter.insertInto is reproducibly crashing the JVM.

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGBUS (0xa) at pc=3D0x0001194a3520, pid=3D16154, 
tid=3D0x8417
#
# JRE version: Java(TM) SE Runtime Environment (8.0_121-b13) (build 
1.8.0_121-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.121-b13 mixed mode 
bsd-amd64 compressed oops)
# Problematic frame:
# v  ~StubRoutines::jshort_disjoint_arraycopy
#
# Failed to write core dump. Core dumps have been disabled. To enable 
core dumping, try "ulimit -c unlimited" before starting Java again
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#

The last call before the crash is made to =
org.apache.spark.unsafe.types.UTF8String.getBytes().
Is there anyone how is willing to help me debug this issue?

Thank you all in advance!
Alexander Zautke



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: External shuffle service on K8S

2018-10-27 Thread Garlapati, Suryanarayana (Nokia - IN/Bangalore)
Hi,
There is an unmerged PR which can be used against spark 2.4(if you are 
interested) or master branch(3.0). Spark 2.3 K8S lacks lot of features. I 
suggest you upgrade to 2.4 which will be released in few days from now.

https://github.com/apache/spark/pull/22722

Regards
Surya

From: Matt Cheah 
Sent: Saturday, October 27, 2018 6:12 AM
To: Li Gao ; vincent.gromakow...@gmail.com
Cc: caolijun1...@gmail.com; user@spark.apache.org
Subject: Re: External shuffle service on K8S

Hi there,

Please see https://issues.apache.org/jira/browse/SPARK-25299 for more 
discussion around this matter.

-Matt Cheah

From: Li Gao mailto:eyesofho...@gmail.com>>
Date: Friday, October 26, 2018 at 9:10 AM
To: "vincent.gromakow...@gmail.com" 
mailto:vincent.gromakow...@gmail.com>>
Cc: "caolijun1...@gmail.com" 
mailto:caolijun1...@gmail.com>>, 
"user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: External shuffle service on K8S

There are existing 2.2 based ext shuffle on the fork:
https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html 
[apache-spark-on-k8s.github.io]

You can modify it to suit your needs.

-Li


On Fri, Oct 26, 2018 at 3:22 AM vincent gromakowski 
mailto:vincent.gromakow...@gmail.com>> wrote:
No it's on the roadmap >2.4

Le ven. 26 oct. 2018 à 11:15, 曹礼俊 
mailto:caolijun1...@gmail.com>> a écrit :
Hi all:

Does Spark 2.3.2 supports external shuffle service on Kubernetes?

I have looked up the 
documentation(https://spark.apache.org/docs/latest/running-on-kubernetes.html 
[spark.apache.org]),
 but couldn't find related suggestions.

If suppports, how can I enable it?

Best Regards

Lijun Cao




Re: Is spark not good for ingesting into updatable databases?

2018-10-27 Thread Jörn Franke
Do you have some code that you can share?

Maybe it is something in your code that unintentionally duplicates it?

Maybe your source (eg the application putting it on Kafka?)duplicates them 
already?
Once and only once processing needs to be done end to end.

> Am 27.10.2018 um 02:10 schrieb ravidspark :
> 
> Hi All,
> 
> My problem is as explained,
> 
> Environment: Spark 2.2.0 installed on CDH
> Use-Case: Reading from Kafka, cleansing the data and ingesting into a non
> updatable database.
> 
> Problem: My streaming batch duration is 1 minute and I am receiving 3000
> messages/min. I am observing a weird case where, in the map transformations
> some of the messages are being reprocessed more than once to the downstream
> transformations. Because of this I have been seeing duplicates in the
> downstream insert only database.
> 
> It would have made sense if the reprocessing of the message happens for the
> entire task in which case I would have assumed the problem is because of the
> task failure. But, in my case I don't see any task failures and only one or
> two particular messages in the task will be reprocessed. 
> 
> Everytime I relaunch the spark job to process kafka messages from the
> starting offset, it would dup the exact same messages all the time
> irrespective of number of relaunches.
> 
> I added the messages that are getting duped back to kafka at a different
> offset to see if I would observe the same problem, but this time it won't
> dup.
> 
> Workaround for now: 
> As a workaround for now, I added a cache at the end before ingestion into DB
> which gets updated processed event and thus making sure it won't be
> reprocessed again.
> 
> 
> My question here is, why am I seeing this weird behavior(only one particular
> message in the entire batch getting reprocessed again)? Is there some
> configuration that would help me fix this problem or is this a bug? 
> 
> Any solution apart from maintaining a cache would be of great help.
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org